Like this project? it on
Source code for blitzdb.backends.base
import abc
import copy
import inspect
import logging
import six
from blitzdb.document import Document, document_classes
logger = logging.getLogger(__name__)
class DoNotSerialize(BaseException):
"""
If an encoder throws this exception, the object in question will not get serialized.
"""
[docs]class NotInTransaction(BaseException):
"""
Gets raised if a function that must only be used inside a database transaction
gets called outside a transaction.
"""
[docs]class InTransaction(BaseException):
"""
Gets raised if a function that must only be used outside a database transaction
gets called inside a transaction.
"""
class ComplexEncoder(object):
@classmethod
def encode(cls,obj,path):
if isinstance(obj,complex):
return {'_type' : 'complex','r' : obj.real,'i' : obj.imag}
return obj
@classmethod
def decode(cls,obj):
if isinstance(obj,dict) and obj.get('_type') == 'complex':
return 1j*obj['i']+obj['r']
return obj
class ComplexQueryEncoder(object):
@classmethod
def encode(cls,obj,path):
if isinstance(obj,complex):
raise ValueError("Currently complex values are not supported in queries! Please write your queries using the imaginary and real values instead.")
return obj
[docs]class Backend(object):
"""
Abstract base class for all backend implementations. Provides operations for querying the database,
as well as for storing, updating and deleting documents.
:param autodiscover_classes: If set to `True`, document classes will be discovered automatically,
using a global list of all classes generated by the Document metaclass.
*The `Meta` attribute*
As with `blitzdb.document.Document`, the `Meta` attribute can be used to define certain class-wide
settings and properties. Redefine it in your backend implementation to change the default values.
"""
class Meta(object):
pass
__metaclass__ = abc.ABCMeta
standard_encoders = [ComplexEncoder]
query_encoders = [ComplexQueryEncoder]
def __init__(self, autodiscover_classes=True, autoload_embedded=True, allow_documents_in_query=True):
self.classes = {}
self.deprecated_classes = {}
self.collections = {}
self._autoload_embedded = autoload_embedded
self._allow_documents_in_query = allow_documents_in_query
if autodiscover_classes:
self.autodiscover_classes()
[docs] def autodiscover_classes(self):
"""
Registers all document classes that have been defined in the code so far. The discovery mechanism
works by reading the value of `blitzdb.document.document_classes`, which is updated by the meta-class
of the :py:class:`blitzdb.document.Document` class upon creation of a new subclass.
"""
for document_class in document_classes:
self.register(document_class)
def unregister(self,cls):
if cls in self.classes:
del self.collections[self.classes[cls]['collection']]
del self.classes[cls]
[docs] def register(self, cls, parameters=None,overwrite = False):
"""
Explicitly register a new document class for use in the backend.
:param cls: A reference to the class to be defined
:param parameters: A dictionary of parameters. Currently, only the `collection` parameter is used
to specify the collection in which to store the documents of the given class.
.. admonition:: Registering classes
If possible, always use `autodiscover_classes = True` or register your document classes beforehand
using the `register` function, since this ensures that related documents can be initialized
appropriately. For example, suppose you have a document class `Author` that contains a list of
references to documents of class `Book`. If you retrieve an instance of an `Author` object from
the database without having registered the `Book` class, the references to that class will not get
parsed properly and will just show up as dictionaries containing a primary key and a collection name.
Also, when :py:meth:`blitzdb.backends.base.Backend.autoregister` is used to register a class,
you can't pass in any parameters to customize e.g. the collection name for that class
(you can of course do this throught the `Meta` attribute of the class)
## Inheritance
If `register` encounters a document class with a collection name that overlaps with the
collection name of an already registered document class, it checks if the new class is a
subclass of the one that is already register. If yes, it associates the new class to the
collection name. Otherwise, it leaves the collection associated to the already
registered class.
"""
if cls in self.deprecated_classes and not overwrite:
return False
if parameters is None:
parameters = {}
if 'collection' in parameters:
collection_name = parameters['collection']
elif hasattr(cls.Meta,'collection'):
collection_name = cls.Meta.collection
else:
collection_name = cls.__name__.lower()
delete_list = []
def register_class(collection_name,cls):
self.collections[collection_name] = cls
self.classes[cls] = parameters.copy()
self.classes[cls]['collection'] = collection_name
if collection_name in self.collections:
old_cls = self.collections[collection_name]
if (issubclass(cls,old_cls) and not (cls is old_cls)) or overwrite:
logger.warning("Replacing class %s with %s for collection %s" % (old_cls,cls,collection_name))
self.deprecated_classes[old_cls] = self.classes[old_cls]
del self.classes[old_cls]
register_class(collection_name,cls)
return True
else:
logger.debug("Registering class %s under collection %s" % (cls,collection_name))
register_class(collection_name,cls)
return True
return False
def get_meta_attributes(self, cls):
def get_user_attributes(cls):
boring = dir(type('dummy', (object,), {}))
return dict([item
for item in inspect.getmembers(cls)
if item[0] not in boring])
if hasattr(cls, 'Meta'):
params = get_user_attributes(cls.Meta)
else:
params = {}
return params
[docs] def autoregister(self, cls):
"""
Autoregister a class that is encountered for the first time.
:param cls: The class that should be registered.
"""
params = self.get_meta_attributes(cls)
return self.register(cls, params)
[docs] def serialize(self, obj, convert_keys_to_str=False,
embed_level=0,
encoders=None,
autosave=True,
for_query=False,path = None):
"""
Serializes a given object, i.e. converts it to a representation that can be stored in the database.
This usually involves replacing all `Document` instances by database references to them.
:param obj: The object to serialize.
:param convert_keys_to_str: If `True`, converts all dictionary keys to string (this is e.g. required for the MongoDB backend)
:param embed_level: If `embed_level > 0`, instances of `Document` classes will be embedded instead of referenced.
The value of the parameter will get decremented by 1 when calling `serialize` on child objects.
:param autosave: Whether to automatically save embedded objects without a primary key to the database.
:param for_query: If true, only the `pk` and `__collection__` attributes will be included in document references.
:returns: The serialized object.
"""
if path is None:
path = []
def get_value(obj,key):
key_fragments = key.split(".")
current_dict = obj
for key_fragment in key_fragments:
current_dict = current_dict[key_fragment]
return current_dict
serialize_with_opts = lambda value,*args,**kwargs : self.serialize(value,*args,
encoders = encoders,
convert_keys_to_str = convert_keys_to_str,
autosave = autosave,
for_query = for_query,
**kwargs)
if encoders is None:
encoders = []
for encoder in self.standard_encoders+encoders:
obj = encoder.encode(obj,path = path)
def encode_as_str(obj):
if six.PY3:
return str(obj)
else:
if isinstance(obj,unicode):
return obj
elif isinstance(obj,str):
return unicode(obj)
else:
return unicode(str(obj),errors='replace')
if isinstance(obj, dict):
output_obj = {}
for key, value in obj.items():
new_path = path[:]+[key]
try:
output_obj[encode_as_str(key) if convert_keys_to_str else key] = serialize_with_opts(value, embed_level=embed_level,path = new_path)
except DoNotSerialize:
pass
elif isinstance(obj,six.string_types):
output_obj = encode_as_str(obj)
elif isinstance(obj, (list,tuple)):
try:
output_obj = [serialize_with_opts(x, embed_level=embed_level,path = path[:]+[i]) for i,x in enumerate(obj)]
except DoNotSerialize:
pass
elif isinstance(obj, Document):
collection = self.get_collection_for_obj(obj)
if embed_level > 0:
try:
output_obj = self.serialize(obj, embed_level=embed_level-1)
except obj.DoesNotExist:#cannot load object, ignoring...
output_obj = self.serialize(obj.lazy_attributes, embed_level=embed_level-1)
except DoNotSerialize:
pass
elif obj.embed:
output_obj = self.serialize(obj)
else:
if obj.pk == None and autosave:
obj.save(self)
if obj._lazy:
# We make sure that all attributes that are already present get included in the reference
output_obj = {}
if obj.get_pk_name() in output_obj:
del output_obj[obj.get_pk_name()]
output_obj['pk'] = obj.pk
output_obj['__collection__'] = self.classes[obj.__class__]['collection']
else:
if for_query and not self._allow_documents_in_query:
raise ValueError("Documents are not allowed in queries!")
if for_query:
output_obj = {'$elemMatch' : {'pk':obj.pk,'__collection__':self.classes[obj.__class__]['collection']}}
else:
ref = "%s:%s" % (self.classes[obj.__class__]['collection'],str(obj.pk))
output_obj = {'__ref__' : ref,'pk':obj.pk,'__collection__':self.classes[obj.__class__]['collection']}
if hasattr(obj,'Meta') and hasattr(obj.Meta,'dbref_includes') and obj.Meta.dbref_includes:
for include_key in obj.Meta.dbref_includes:
try:
value = get_value(obj,include_key)
output_obj[include_key.replace(".","_")] = value
except KeyError:
continue
else:
output_obj = obj
return output_obj
[docs] def deserialize(self, obj, encoders=None, embedded=False, create_instance=True):
"""
Deserializes a given object, i.e. converts references to other (known) `Document` objects by lazy instances of the
corresponding class. This allows the automatic fetching of related documents from the database as required.
:param obj: The object to be deserialized.
:returns: The deserialized object.
"""
if not encoders:
encoders = []
for encoder in encoders + self.standard_encoders:
obj = encoder.decode(obj)
if isinstance(obj, dict):
if create_instance and '__collection__' in obj and obj['__collection__'] in self.collections and 'pk' in obj:
#for backwards compatibility
attributes = copy.deepcopy(obj)
del attributes['__collection__']
if '__ref__' in attributes:
del attributes['__ref__']
if '__lazy__' in attributes:
lazy = attributes['__lazy__']
del attributes['__lazy__']
else:
lazy = True
output_obj = self.create_instance(obj['__collection__'], attributes, lazy=lazy)
else:
output_obj = {}
for key, value in obj.items():
output_obj[key] = self.deserialize(value,encoders = encoders)
elif isinstance(obj, (list,tuple)):
output_obj = list(map(lambda x: self.deserialize(x), obj))
else:
output_obj = obj
return output_obj
def create_instance(self, collection_or_class, attributes, lazy=False, call_hook=True, deserialize=True, db_loader=None):
"""
Creates an instance of a `Document` class corresponding to the given collection name or class.
:param collection_or_class: The name of the collection or a reference to the class for which to create an instance.
:param attributes: The attributes of the instance to be created
:param lazy: Whether to create a `lazy` object or not.
:returns: An instance of the requested Document class with the given attributes.
"""
creation_args = {
'backend' : self,
'autoload' : self._autoload_embedded,
'lazy' : lazy,
'db_loader' : db_loader
}
if collection_or_class in self.classes:
cls = collection_or_class
elif collection_or_class in self.collections:
cls = self.collections[collection_or_class]
else:
raise AttributeError("Unknown collection or class: %s!" % str(collection_or_class))
#we deserialize the attributes that we receive
if deserialize:
deserialized_attributes = self.deserialize(attributes, create_instance=False)
else:
deserialized_attributes = attributes
if 'constructor' in self.classes[cls]:
obj = self.classes[cls]['constructor'](deserialized_attributes, **creation_args)
else:
obj = cls(deserialized_attributes, **creation_args)
if call_hook:
self.call_hook('after_load',obj)
return obj
@property
@abc.abstractmethod
def current_transaction(self):
pass
def transaction(self,implicit = False):
"""
This returns a context guard which will automatically open and close a transaction
"""
class TransactionManager(object):
def __init__(self,backend,implicit = False):
self.backend = backend
self.implicit = implicit
def __enter__(self):
self.within_transaction = True if self.backend.current_transaction else False
self.transaction = self.backend.begin()
def __exit__(self,exc_type,exc_value,traceback_obj):
if exc_type:
self.backend.rollback(self.transaction)
return False
else:
#if the transaction has been created implicitly and we are not within
#another transaction, we leave it open (the user needs to call commit manually)
#if self.implicit and not self.within_transaction:
# return
self.backend.commit(self.transaction)
return TransactionManager(self,implicit = implicit)
def get_collection_for_obj(self, obj):
"""
Returns the collection name for a given object, based on the class of the object.
:param obj: The object for which to return the collection name.
:returns: The collection name for the given object.
"""
return self.get_collection_for_cls(obj.__class__)
def get_collection_for_cls(self, cls):
"""
Returns the collection name for a given document class.
:param cls: The document class for which to return the collection name.
:returns: The collection name for the given class.
"""
if cls not in self.classes:
if issubclass(cls, Document) and cls not in self.classes and cls not in self.deprecated_classes:
self.autoregister(cls)
else:
raise AttributeError("Unknown object type: %s" % cls.__name__)
collection = self.classes[cls]['collection']
return collection
def get_collection_for_cls_name(self, cls_name):
"""
Returns the collection name for a given document class.
:param cls: The document class for which to return the collection name.
:returns: The collection name for the given class.
"""
for cls in self.classes:
if cls.__name__ == cls_name:
return self.classes[cls]['collection']
raise AttributeError("Unknown class name: %s" % cls_name)
def get_cls_for_collection(self, collection):
"""
Return the class for a given collection name.
:param collection: The name of the collection for which to return the class.
:returns: A reference to the class for the given collection name.
"""
for cls, params in self.classes.items():
if params['collection'] == collection:
return cls
raise AttributeError("Unknown collection: %s" % collection)
def call_hook(self,name,obj,*args,**kwargs):
try:
hook = obj.get_lazy_attribute(name)
return hook(*args,**kwargs)
except AttributeError:
pass
[docs] @abc.abstractmethod
def save(self, obj, cache=None):
"""
Abstract method to save a `Document` instance to the database.
:param obj: The object to be stored in the database.
:param cache: Whether to performed a cached save operation (not supported by all backends).
"""
[docs] @abc.abstractmethod
def get(self, cls, properties):
"""
Abstract method to retrieve a single object from the database according to a list of properties.
:param cls: The class for which to return an object.
:param properties: The properties of the object to be returned
:returns: An instance of the requested object.
.. admonition:: Exception Behavior
Raises a :py:class:`blitzdb.document.Document.DoesNotExist` exception if no object with the given
properties exists in the database, and a :py:class:`blitzdb.document.Document.MultipleObjectsReturned`
exception if more than one object in the database corresponds to the given properties.
"""
[docs] @abc.abstractmethod
def delete(self, obj):
"""
Deletes an object from the database.
:param obj: The object to be deleted.
"""
[docs] @abc.abstractmethod
def filter(self, cls, **kwargs):
"""
Filter objects from the database that correspond to a given set of
properties.
:param cls: The class for which to filter objects from the database.
:param properties: The properties used to filter objects.
:returns: A `blitzdb.queryset.QuerySet` instance containing the keys of the objects matching the query.
.. admonition:: Functionality might differ between backends
Please be aware that the functionality of the `filter` function might
differ from backend to backend. Consult the documentation of the given
backend that you use to find out which queries are supported.
"""