import collections
import datetime
import copy
from functools import wraps
import itertools
import json
import weakref # This is some serious next-level stuff :D
import types # For annotations
import pdb
from .util import *
from .sql import *
# Exceptions
# ==========
[docs]class PropertyConstraintFail(Error):
"""Raised when a property failed to follow its constraint."""
def __init__(self, obj, prop):
self.obj = obj
self.prop = prop
def __str__(self):
try:
return "Constraint of property {s.prop.name} of object {s.obj} failed".format(s=self)
except Exception as e:
return "Constraint of property {s.prop.name} of some {t} failed (and __str__ failed too!)".format(s=self, t=type(self.obj).__name__)
[docs]class ObjectConstraintFail(Error):
"""Raised when an object failed to follow its constraint."""
def __init__(self, obj):
self.obj = obj
def __str__(self):
try:
return "Object-wide constraint of object {s.obj} failed".format(s=self)
except:
return "Object-wide constraint of object {t} failed (and __str__ failed too!)".format(t=type(self.obj).__name__)
# TODO use this (search for CantSetProperty for more notes)
[docs]class CantSetProperty(Error):
"""Raised when trying to set properties you may not edit. Mainly
when editing/setting through edit_from_json or Cls(json_dict=...)."""
def __init__(self, obj, propnames):
self.obj = obj
self.propnames = propnames
def __str__(self):
try:
return "Tried and failed to set {n} of object {s.obj}".format(s=self, n=", ".join(self.propnames))
except:
return "Tried and failed to set {n} of some {t} (and __str__ failed too!)".format(n=", ".join(self.propnames), t=type(self.obj).__name__)
# Type stuff
# ==========
default_sqltypes = {
int: "BIGINT",
str: "VARCHAR",
float: "DOUBLE PRECISION",
bool: "BOOL",
datetime.datetime: "TIMESTAMP", # but consider perhaps amount of seconds since UNIX epoch
}
[docs]class Type:
def __init__(self, python_type, sql_type=None):
if sql_type is not None:
self.python_type = python_type
self.sql_type = sql_type
else:
self.python_type = python_type
self.sql_type = default_sqltypes[python_type]
[docs] def to_sql(self, obj: "self.python_type"):
return obj
[docs] def from_sql(self, obj):
return obj
constraint = None
def __str__(self):
return "Type({}, {})".format(str(self.python_type), self.sql_type)
[docs]class StaticType(Type):
pass
class _Json(StaticType):
@staticmethod
def to_sql(obj):
return json.dumps(obj)
#@staticmethod
#def from_sql(obj):
#return json.loads(obj)
Json = _Json(str, "JSONB")
[docs]class List(Type):
def __init__(self, inner_type):
self.python_type = list
if not isinstance(inner_type, Type):
inner_type = Type(inner_type)
self.inner_type = inner_type
self.sql_type = inner_type.sql_type + "[]"
[docs] def to_sql(self, obj: "self.python_type", f=repr):
return "{" + ", ".join([f(o) for o in obj]) + "}"
def __str__(self):
return "List(" + str(self.inner_type) + ")"
[docs]class Enum(Type):
def __init__(self, *args):
self.options = args
self.inv_options = {val: num for num, val in enumerate(self.options)}
self.python_type = str
@property
def constraint(self):
def _constraint(val, _self=self):
return val in self.options
return _constraint
def __postinit__(self):
self.__postinited__ = True
self._create_type_command = RawSql("CREATE TYPE {s.name} AS ENUM ({opt})".format(
s=self, opt=", ".join(["'" + str(s) + "'" for s in self.options])))
self._drop_type_command = RawSql("DROP TYPE IF EXISTS {s.name} CASCADE".format(s=self))
self.sql_type = self.name
def __str__(self):
return "Enum(" + ", ".join([repr(o) for o in self.options]) + ")"
# Entity stuff
# ============
[docs]def create_where_comparison(op):
def method(self, other):
return Where(self, op, other)
return method
[docs]def create_order(op):
def method(self):
return Order(self, op)
return method
[docs]class Queryable:
_overloads = {
"__lt__": create_where_comparison("<"),
"__gt__": create_where_comparison(">"),
"__le__": create_where_comparison("<="),
"__ge__": create_where_comparison(">="),
"__eq__": create_where_comparison("="),
"__ne__": create_where_comparison("!="),
"__pos__": create_order("ASC"),
"__neg__": create_order("DESC"),
}
def __eq__(self, other):
return self is other
def __ne__(self, other):
return self is not other
_originals = {
"__eq__": __eq__,
"__ne__": __ne__,
}
_use_own_overloads = False
@staticmethod
def _set_overloads(use_own: bool):
if Queryable._use_own_overloads == use_own:
return
if use_own:
for k in Queryable._originals:
delattr(Queryable, k)
for (k, v) in Queryable._overloads.items():
setattr(Queryable, k, v)
else:
for k in Queryable._overloads:
delattr(Queryable, k)
for (k, v) in Queryable._originals.items():
setattr(Queryable, k, v)
Queryable._use_own_overloads = use_own
[docs]class Property(Queryable):
def __init__(self, typ, constraint: types.FunctionType = None, sql_extra: str = "",
required: bool = True, json: bool = True):
if not isinstance(typ, (Type, StaticType)):
typ = Type(typ)
self.type = typ
if self.type.constraint is not None:
if constraint is None:
constraint = self.type.constraint
else:
constraint = lambda val: self.type.constraint(val) and constraint(val)
self.constraint = constraint
self.sql_extra = sql_extra
self.required = required
self.json = json
self.name = None # Set by the metaclass
self.dataname = None # Idem, where to find the actual stored data inside an object
self.cls = None # Idem
def __postinit__(self):
self.__postinited__ = True
# ...
[docs] def sql_def(self):
return "\t" + self.name + " " + self.type_sql_def()
[docs] def type_sql_def(self):
return self.type.sql_type + (" " + self.sql_extra if self.sql_extra != "" else "") + (" NOT NULL" if self.required else "")
def __str__(self):
return self.cls._table_name + "." + self.name
[docs]class Key(Queryable):
"""
A reference to other properties that define the key of this object.
"""
def __init__(self, *props):
self.orig_props = props
self.props = props
self.single_prop = None
def __postinit__(self):
self.__postinited__ = True
newprops = []
for p in self.props:
if isinstance(p, Reference):
newprops.extend(p.props)
assert len(p.props) >= 1
else:
newprops.append(p)
self.props = newprops
assert len(self.props) >= 1
if len(self.props) == 1:
self.single_prop = self.props[0]
self.__class__ = SingleKey
[docs] def referencing_props(self):
yield from self.props
[docs] def sql_constraint(self) -> str:
"""Returns the SQL needed to define the PRIMARY KEY constraint."""
return "\tPRIMARY KEY " + str(self)
def __str__(self):
return "({keys})".format(keys=", ".join([p.name for p in self.referencing_props()]))
def __get__(self, obj, type=None):
if obj is None:
return self
return tuple([obj.__dict__[p.dataname] for p in self.referencing_props()])
def __set__(self, obj, val):
if obj is not None:
for (i, p) in enumerate(self.referencing_props()):
if p.constraint is not None and not p.constraint(val):
raise PropertyConstraintFail(obj, p)
obj.__dict__[p.dataname] = val[i]
def __delete__(self, obj):
pass # ?
[docs]class SingleKey(Key):
"""Version of Key with only one property.
(Don't directly use this, it will be automatic.)
"""
def __get__(self, obj, type=None):
if obj is None:
return self
return obj.__dict__[self.single_prop.dataname]
def __set__(self, obj, val):
if obj is not None:
if self.single_prop.constraint is not None and not self.single_prop.constraint(val):
raise PropertyConstraintFail(obj, single_prop)
obj.__dict__[self.single_prop.dataname] = val
def __delete__(self, obj):
pass # ?
[docs]class KeyProperty(SingleKey, Property):
"""
A specifically created property to be used as a key.
Type in postgres is SERIAL.
"""
def __init__(self):
Property.__init__(self, Type(int, "SERIAL"), required=False)
self.single_prop = self
def __postinit__(self):
self.__postinited__ = True
[docs] def referencing_props(self):
yield self
[docs] def sql_constraint(self) -> str:
return "\tPRIMARY KEY (" + self.name + ")"
__str__ = Property.__str__
[docs]class Reference(Queryable):
"""A reference to another Entity type."""
def __init__(self, ref: "MetaEntity", json=True, cascade=True):
self.ref = ref
self.json = json
self.ref_props = list(ref.key.referencing_props())
assert len(self.ref_props) >= 1
self.props = []
self.single_prop = None
self.name = None # Set by metaclass
self.cascade = cascade
@classmethod
[docs] def single_upgrade(cls):
return SingleReference
def __postinit__(self): # called by metaclass
self.__postinited__ = True
self.props = []
for rp in self.ref_props:
p = Property(Type(rp.type.python_type, rp.type.sql_type if not rp.type.sql_type == "SERIAL" else None), json=self.json)
p.cls = rp.cls
p.name = self.name + "_" + rp.name
p.dataname = self.name + "_" + rp.dataname
self.props.append(p)
assert len(self.props) >= 1
if len(self.props) == 1:
self.single_prop = self.props[0]
self.__class__ = self.single_upgrade()
[docs] def sql_constraint(self) -> str:
"""Will only generate the SQL constraint. The metaclass will take care of the properties."""
return "\tFOREIGN KEY ({own_props}) REFERENCES {ref_name}".format(
own_props=", ".join([p.name for p in self.props]),
ref_name=self.ref._table_name,
) + " ON DELETE CASCADE" if self.cascade else ""
def __str__(self):
return "(" + ", ".join([str(p) for p in self.props]) + ")"
def __get__(self, obj, type=None):
if obj is None:
return self
return tuple([obj.__dict__[p.dataname] for p in self.props])
def __set__(self, obj, val):
if obj is not None:
for (i, p) in enumerate(self.props):
obj.__dict__[p.dataname] = val[i]
__simple_set__ = __set__
def __delete__(self, obj):
pass # ?
[docs]class RTReference(Reference):
"""Reference that automatically notifies the referencing object."""
def __init__(self, ref: "MetaEntity"):
"""`ref` needs to be a subclass of `RTEntity`."""
assert issubclass(ref, RTEntity)
Reference.__init__(self, ref)
@classmethod
[docs] def single_upgrade(cls):
return RTSingleReference
def __set__(self, obj, val):
if obj is not None:
try:
key = self.__get__(obj)
if key in self.ref.cache:
self.ref.cache[key].remove_reference(self, obj)
except KeyError:
pass
for (i, p) in enumerate(self.props):
obj.__dict__[p.dataname] = val[i]
# Update
if val in self.ref.cache:
self.ref.cache[val].new_reference(self, obj)
[docs]class SingleReference(Reference):
"""Version of Reference with only one referencing property.
(Don't directly use this, it will be automatic.)
"""
def __get__(self, obj, type=None):
if obj is None:
return self
return obj.__dict__[self.single_prop.dataname]
def __set__(self, obj, val):
# References can not be constrained
if obj is not None:
obj.__dict__[self.single_prop.dataname] = val
__simple_set__ = __set__
def __str__(self):
return str(self.single_prop)
[docs]class RTSingleReference(RTReference, SingleReference):
"""Version of RTReference with only one referencing property.
(Don't directly use this, it will be automatic.)
"""
def __set__(self, obj, val):
# References can not be constrained
if obj is not None:
try:
key = obj.__dict__[self.single_prop.dataname]
if key in self.ref.cache:
self.ref.cache[key].remove_reference(self, obj)
except KeyError:
pass
obj.__dict__[self.single_prop.dataname] = val
# Update
if val in self.ref.cache:
self.ref.cache[val].new_reference(self, obj)
# This is a better way of doing things than python's native property
[docs]class ConstrainedProperty(Property):
# No init, just hack around it by setting __class__
def __get__(self, obj, type=None):
if obj is None:
return self
return obj.__dict__[self.dataname]
def __set__(self, obj, val):
if obj is not None:
if not self.constraint(val):
raise PropertyConstraintFail(obj, self)
obj.__dict__[self.dataname] = val
[docs]def classitems(dct, bases):
"""Helper function to allow for inheritance"""
for b in bases:
yield from classitems(b.__dict__, b.__bases__)
yield from dct.items()
[docs]class Entity(metaclass=MetaEntity):
"""Central class for an Entity.
WARNING: Be careful with changing the key as it will fuck with caching.
Basically, don't do it.
"""
__no_meta__ = True
def __init__(self, *args, **kwargs):
self.__metainit__(*args, **kwargs)
[docs] async def insert(self, db: Database = None, replace=False):
"""Insert in database."""
if db is None:
db = GlobalDb.get()
if self.key is None:
assert type(self)._incomplete
await self._simple_insert(db, replace)
assert self.key is not None
if not replace:
assert self.key not in type(self).cache, "Tried inserting but already in cache!"
type(self).cache[self.key] = self
# In case of replace, this kinda invalidates some elements
# So be careful with replace!
else:
await self._simple_insert(db, replace)
async def _simple_insert(self, db: Database = None, replace=False):
if db is None:
db = GlobalDb.get()
self.check()
assert not self.in_db
cls = type(self)
dct = {}
for p in self._complete_props:
dct[p.name] = p.type.to_sql(self.__dict__[p.dataname])
if replace:
insert = cls._replace_command.with_data(**dct)
else:
insert = cls._insert_command.with_data(**dct)
if cls._incomplete:
result = await insert.raw(db)
self.__dict__[cls.key.dataname] = result[0]
else:
await insert.exec(db)
self.in_db = True
for rt_ref in self._rt_refs:
# Send references to all RTReferences we couldn't trigger when initializing
# (Because some properties weren't actually set)
val = getattr(self, rt_ref.name)
if val in rt_ref.ref.cache:
rt_ref.ref.cache[val].new_reference(rt_ref, self)
[docs] async def update(self, db=None):
"""Update object in the database."""
if db is None:
db = GlobalDb.get()
self.check()
assert self.in_db
dct = {}
for p in self._complete_props:
dct[p.name] = p.type.to_sql(self.__dict__[p.dataname])
if type(self)._incomplete:
dct[type(self).key.name] = self.__dict__[type(self).key.dataname]
await type(self)._update_command.with_data(**dct).exec(db)
[docs] async def delete(self, db=None):
"""Delete object from the database."""
if db is None:
db = GlobalDb.get()
assert self.in_db
dct = {}
for p in type(self).key.referencing_props():
dct[p.name] = self.__dict__[p.dataname]
await type(self)._delete_command.with_data(**dct).exec(db)
self.in_db = False
constraint = None
"""
object-wide constraint is checked at three times:
- `__init__`
- `insert`
- `update`
- + manual calling of check
"""
[docs] def check(self):
"""Check object-wide constraint."""
if self.constraint is not None:
if not self.constraint():
raise ObjectConstraintFail(self)
@classmethod
[docs] def raw(cls: MetaEntity, text: str, dct={}) -> RawSql:
"""Return a RawSql where the results will be interpreted as objects of `cls`."""
return RawClassedSql(cls, text, dct)
@classmethod
[docs] def get(cls: MetaEntity, *where_clauses: list) -> Sql:
return Select(cls, where_clauses)
@classmethod
[docs] async def find_by_key(cls: MetaEntity, key, db: Database = None) -> "cls":
"""Works different from `get`, as it will immediatly return the object"""
if db is None:
db = GlobalDb.get()
try:
return cls.cache[key]
except KeyError:
return await cls._find_by_key_query.with_data(key=key).single(db)
def __setprop__(self, prop, val):
if isinstance(prop, ConstrainedProperty) and (not prop.constraint(val)):
raise PropertyConstraintFail(self, prop)
self.__dict__[prop.dataname] = val
def __getprop__(self, prop):
return self.__dict__[prop.dataname]
[docs] def edit_from_json(self, dct: dict):
#used = set()
for p in type(self)._edit_json_props:
if p.name in dct:
self.__setprop__(p, dct[p.name])
#used.add(p.name)
# TODO perhaps make this work again (Keys are checked!)
#notused = dct.keys() - used
#if len(notused) > 0:
# raise CantSetProperty(self, notused)
[docs] def to_json(self) -> str:
return json.dumps(self.json_repr())
[docs] def json_repr(self) -> dict:
"""Returns a dictionary of all properties that don't contain `json = False`.
When overriding this method, you can return anything you want as long as it is convertible
to JSON.
"""
d = {}
for p in self._json_props:
d[p.name] = self.__dict__[p.dataname]
return d
def __eq__(self, other):
# Yay caching
return self is other
def __hash__(self):
# Yay caching
return id(self)
def __str__(self):
try:
return type(self).__name__ + str(self.key) if isinstance(self.key, tuple) else "(" + str(self.key) + ")"
except:
return "some " + type(self).__name__
[docs]class RTEntity(Entity):
"""Subclass of Entity that sends live updates!
Listeners should follow the interface of `Listener`.
"""
__no_meta__ = True
def __init__(self, *args, **kwargs):
self._listeners = set()
super(RTEntity, self).__init__(*args, **kwargs)
[docs] async def update(self, db: Database = None):
if db is None:
db = GlobalDb.get()
await super(RTEntity, self).update(db)
for l in self._listeners:
l.update(self)
[docs] def send_update(self, db = None):
if db is None:
db = GlobalDb.get()
"""To manually send messages to all listeners. Won't save to database."""
for l in self._listeners:
l.update(self)
[docs] async def delete(self, db = None):
if db is None:
db = GlobalDb.get()
await super(RTEntity, self).delete(db)
for l in self._listeners:
l.delete(self)
l._remove_listenee(self)
[docs] def new_reference(self, ref, ref_obj):
for l in self._listeners:
l.new_reference(self, ref_obj)
[docs] def remove_reference(self, ref, ref_obj):
for l in self._listeners:
l.remove_reference(self, ref_obj)
# TODO perhaps a problem with deleting?
[docs] def add_listener(self, l: "Listener"):
"""Add listeners to this object."""
self._listeners.add(l)
l._add_listenee(self)
[docs] def remove_listener(self, l: "Listener"):
if l in self._listeners:
self._listeners.remove(l)
l._remove_listenee(self)
[docs] def remove_all_listeners(self):
for l in self._listeners:
self._listeners.remove(l)
l._remove_listenee(self)
[docs]class Listener:
"""Interface for a listener to be used with `RTEntity`. Main use is documentation,
not functionality.
Most implementations will want to define a `set` of objects they are
listening to (*listenees*).
"""
def _add_listenee(self, obj: RTEntity):
"""Add from listenees set.
(This method has an underscore to discourage directly calling it. You should
instead use the methods in `RTEntity`.)
"""
def _remove_listenee(self, obj: RTEntity):
"""Remove from listenees set.
(This method has an underscore to discourage directly calling it. You should
instead use the methods in `RTEntity`.)
"""
[docs] def update(self, obj: RTEntity):
"""Handle updates to the object."""
[docs] def delete(self, obj: RTEntity):
"""Handle deletions of the object."""
[docs] def new_reference(self, obj: RTEntity, ref_obj: Entity):
"""Handle a new reference from `ref_obj` to `obj`. `ref_obj` does not have to be a
`RTEntity`.
"""
[docs] def remove_reference(self, obj: RTEntity, ref_obj: Entity):
"""Handle the removal of a reference from `ref_obj` to `obj`. `ref_obj` does not
have to be a `RTEntity`.
"""