import json
import logging
import redis
from dq.config import Config
from dq.logging import error
logger = logging.getLogger(__name__)
[docs]def init_redis(key):
"""Initialize a Redis connection.
:param string key: The config key. The entry should at least contain the
host, port and db number of the instance.
:returns redis: The redis instance if the config exists and is valid, and
None otherwise.
"""
cfg = Config.get(key)
if not cfg:
return None
try:
i = redis.Redis(**cfg)
# This will attempt to connect to Redis and throw an error if the
# connection is invalid.
i.info()
return i
except Exception:
error(logger, 'Unable to connect to Redis', None)
return None
[docs]def strval(value):
"""JSON serialize value as appropriate.
This function should only be used internally.
:param dict|list|string|number value: An input value.
:returns string: The output value, suitable for saving by Redis. If
``value`` is a ``dict`` or ``list``, it will be JSON-serialized.
Otherwise it will be left as-is. Note that while Redis only takes
string values, numbers have their string values be themselves in
strings, and the conversion will be done by Redis automatically.
"""
return json.dumps(value) if isinstance(value, (list, dict)) else value
[docs]def strvals(*values):
"""JSON serialize values as appropriate.
This function should only be used internally.
:param ...dict|list|string|number values: Input values.
:returns list<string>: The output values. See docs for ``strval`` for
more explanations.
"""
return [strval(v) for v in values]
[docs]class Redis(object):
_instance = init_redis('redis')
[docs] @classmethod
def exists(cls, key):
"""Whether the key exists in Redis.
:param string key: The Redis key.
:returns boolean: ``True`` if the key exists, and ``False`` otherwise.
"""
return cls._instance.exists(key)
[docs] @classmethod
def get(cls, key):
"""Get the value stored at the key.
:param string key: The Redis key.
:returns string: The value of the key. If the key does not exist,
``None`` will be returned.
"""
return cls._instance.get(key)
[docs] @classmethod
def get_json(cls, key):
"""Get the value stored at the key as JSON.
:param string key: The Redis key.
:returns object: The value of the key as an unserialized JSON object.
If the key does not exist, ``None`` will be returned.
"""
resp = cls.get(key)
return json.loads(resp) if resp else None
[docs] @classmethod
def set(cls, key, value):
"""Set the key to the specified value.
:param string key: The Redis key.
:param string value: The value to set. If this is not a string, it will
be casted to a string.
:returns boolean: ``True`` if the operation is successful.
"""
return cls._instance.set(key, strval(value))
[docs] @classmethod
def setex(cls, key, value, second):
"""Set the key to the specified value, with an expiration time.
:param string key: The Redis key.
:param string value: The value to set.
:param int second: The TTL in second.
:returns boolean: ``True`` if the operation is successful.
"""
return cls._instance.setex(key, second, strval(value))
[docs] @classmethod
def expire(cls, key, second):
"""Set the key to expire in specified second.
:param string key: The key to set expire.
:param int second: The number of seconds for the key to live.
:returns boolean: True if the operation is successful.
"""
return cls._instance.expire(key, second)
[docs] @classmethod
def rpush(cls, key, *values):
"""Add values to a Redis list from the end.
The ``values`` argument is a variable-length array and can be
specified as follows:
.. code-block:: python
redis.rpush('danqing', 'val1', 'val2')
redis.rpush('danqing', 'val1')
redis.rpush('danqing', 'val1', 'val2', 'val3')
:param string key: The key of the list. If the list does not exist yet,
it will be created.
:param string... values: A list of values to insert. If any is not a
string, it will be casted to a string.
:returns int: The total number of elements in the list after the push.
"""
return cls._instance.rpush(key, *strvals(*values))
[docs] @classmethod
def delete(cls, key):
"""Delete the key from Redis.
:param string key: The key to delete.
:returns int: The number of items deleted. If 1, the key is found and
deleted. If 0, the key is not found and nothing is done.
"""
return cls._instance.delete(key)
[docs] @classmethod
def hgetall(cls, key):
"""Get the hash table at the specified key.
:param string key: The key to fetch.
:returns dict: The hash table at the specified key. If no hash table
found (i.e. key not found), an empty dictionary is returned.
:raises redis.ResponseError: If ``key`` holds something other than a
hash table.
"""
return cls._instance.hgetall(key)
[docs] @classmethod
def hget(cls, key, hash_key):
"""Get the value for a hash key in the hash table at the specified key.
:param string key: The key to fetch.
:param string hash_key: The hash key to fetch value for in the hash
table.
:returns string: The value corresponding to the hash key in the hash
table. If either ``key`` or ``hash_key`` is not found, ``None`` is
returned.
:raises redis.ResponseError: If ``key`` holds something other than a
hash table.
"""
return cls._instance.hget(key, hash_key)
[docs] @classmethod
def hset(cls, key, hash_key, hash_value):
"""Set the value for a hash key in the hash table at the specified key.
:param string key: The key of the hash table. If the hash table does
not exist yet, it will be created.
:param string hash_key: The hash key to set value for in the hash
table.
:param string hash_value: The value to set to the key. If this is not
a string, it will be casted to a string.
:returns int: The number of new fields. If 1, a new field is added
(``hash_key`` is new). If 0, ``hash_key`` already exists and its
value is updated.
:raises redis.ResponseError: If ``key`` holds something other than a
hash table.
"""
return cls._instance.hset(key, hash_key, strval(hash_value))
[docs] @classmethod
def hdelete(cls, key, *hash_keys):
"""Delete keys from a hash table.
The ``hash_keys`` argument is a variable-length array and can be
specified as follows:
.. code-block:: python
redis.hdelete('danqing', 'key1', 'key2')
redis.hdelete('danqing', 'key2')
redis.hdelete('danqing', 'key1', 'key2', 'key3')
:param string key: The key of the hash table.
:param string... hash_keys: A list of hash keys to delete from the hash
table.
:returns int: The number of keys actually deleted. If 3 hash keys are
specified but only 1 is found (and deleted), 1 is returned.
:raises redis.ResponseError: If ``key`` holds something other than a
hash table.
"""
return cls._instance.hdel(key, *hash_keys)
[docs] @classmethod
def lpeek(cls, key, count):
"""Peek the first count elements in the list without popping.
:param string key: The key of the array.
:param int count: The number of elements to peek.
:returns list: The list of peeked elements.
"""
return cls._instance.lrange(key, 0, count - 1)
[docs] @classmethod
def lpop(cls, key, count):
"""Pop the first count elements in the list.
:param string key: The key of the array.
:param int count: The number of elements to pop. If there are fewer
than ``count`` elements, everything will be popped.
:returns list: The list of popped elements.
"""
pipe = cls._instance.pipeline()
pipe.lrange(key, 0, count - 1)
pipe.ltrim(key, count, - 1)
result = pipe.execute()
return result[0] if result[1] else []
[docs] @classmethod
def atomic_rw(cls, key, evaluator=lambda x: x):
"""Atomically read-write a Redis key.
:param string key: The key to read/write.
:param function evaluator: The evaluator function. It takes the
existing value of the key, and should return the new value. If it
returns None, no update is done and the operation is considered
aborted. If not provided, the identity function is used.
:returns *: The value returned by evaluator. If there's an
atomicity violation, None is returned.
:returns boolean: True if there's no atomicity error. That is, this
value is True if write succeeded or the user aborted, and False if
there's a atomicity violation (that prevented the write).
"""
with cls._instance.pipeline() as pipe:
try:
pipe.watch(key)
value = evaluator(pipe.get(key))
if not value:
return None, True
pipe.multi()
pipe.set(key, strval(value))
pipe.execute()
return value, True
except redis.WatchError:
return None, False
[docs] @classmethod
def atomic_rw_hash(cls, key, hash_key, evaluator=lambda x: x):
"""Atomically read-write a Redis hash key.
:param string key: The key of the hash to read/write.
:param string hash_key: The hash key within the hash to read/write.
:param function evaluator: The evaluator function. It takes the
existing value of the key, and should return the new value. If it
returns None, no update is done and the operation is considered
aborted. If not provided, the identity function is used.
:returns *: The value returned by evaluator. If there's an
atomicity violation, None is returned.
:returns boolean: True if there's no atomicity error. That is, this
value is True if write succeeded or the user aborted, and False if
there's a atomicity violation (that prevented the write).
"""
with cls._instance.pipeline() as pipe:
try:
pipe.watch(key)
value = evaluator(pipe.hget(key, hash_key))
if not value:
return None, True
pipe.multi()
pipe.hset(key, hash_key, strval(value))
pipe.execute()
return value, True
except redis.WatchError:
return None, False