On Sep 4, 2013, at 7:08 AM, Paul Balomiri <paulbalom...@gmail.com> wrote:

> 
> My problem is related to the fact that the list values of the GBK defaultdict 
> are plain list types, and thus cannot fire events for operations on them. The 
> testcase below does not work, and, as you mentioned, no other operation on 
> GBK's list values will fire the corresponding events.
> 
> Now my attempt (admittedly without enough knowledge of sqlalchemy internals) 
> was to create a list which *forwards* append/remove events
> to the GBK Collection which could in turn add/remove them in their quality as 
> true InstrumentedAttribute (thus handling the DB part) . So more specifically 
> i used prepare_instrumentation() hoping to be able to instantiate an 
> InstrumentedList with event capabilities. The InstrumentedLists would not 
> need be first class InstrumentedAttributes ( -- perhaps could not because 
> they appear after reflection time? --). 

OK let's go back to my original reply, this is the key sentence:

 "In particular I stopped at getting 
Person.addresses_by_role['role'].append(Address()) to work, since that means 
we'd need two distinctly instrumented collections, it's doable but is more 
complex. "

that is, when we work with the list collections that are in the dict, *that* 
part is not instrumented, it is a plain list.   That's why the implementation 
has the methods "add(key, value)" and "remove(key, value)", which do the 
dictionary + list access in one step - direct mutation of the contained list is 
not part of any instrumented system and was omitted from my original example.  
For those to work, we need two new collections - there is no magic within the 
existing collection system that has any clue about these completely different 
kinds of collections, so the event system, prepare_instrumentation, etc, none 
of that applies here.   The event system is not relevant here, as that system 
allows consumers to be associated with producers, but in this case, we're the 
producer!  there is no shortcut and this is not anything built in to SQLAlchemy 
- these are totally new kinds of collections and all four interfaces (dict, 
list, proxied dict, proxied list) must be fully implemented, if you truly want 
completely transparent behavior.  Here are all four, not completed yet, might 
have bugs, but implementing instrumented (and proxied) access on the sub-list 
element.   


from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm.collections import collection, collection_adapter
from sqlalchemy.ext.associationproxy import association_proxy, 
_AssociationCollection, _AssociationList
Base = declarative_base()

class GroupByKeyCollection(dict):
    def __init__(self, keyfunc):
        self.keyfunc = keyfunc

    def __missing__(self, key):
        l = GroupByKeyList(self, key)
        dict.__setitem__(self, key, l)
        return l

    @collection.appender
    def add(self, value, _sa_initiator=None):
        key = self.keyfunc(value)
        self[key].append(value, _sa_initiator)

    @collection.remover
    def remove(self, value, _sa_initiator=None):
        key = self.keyfunc(value)
        self[key].remove(value, _sa_initiator)

    @collection.internally_instrumented
    def __setitem__(self, key, value):
        if key in self:
            old_list = self[key]
            to_remove = set(list_).difference(value)
            for item in old_list:
                if item in to_remove:
                    old_list._remove_event(item)
        else:
            old_list = []

        to_add = set(value).difference(old_list)
        new_list = self.__missing__(key)
        for item in value:
            if item in to_add:
                new_list._append_event(item)

    @collection.internally_instrumented
    def __delitem__(self, key):
        if key in self:
            existing = self[key]
            for value in existing:
                existing._remove_event(value)
            dict.__delitem__(self, key)
        else:
            raise KeyError(key)

    @collection.iterator
    def iterate(self):
        for collection in self.values():
            for item in collection:
                yield item

    @collection.converter
    def _convert(self, target):
        for collection in target.values():
            for item in collection:
                yield item

    def update(self, k):
        raise NotImplementedError()

class GroupByKeyList(list):
    def __init__(self, parent, key):
        self.parent = parent
        self.key = key

    def append(self, value, _sa_initiator=None):
        value = self._append_event(value, _sa_initiator)
        list.append(self, value)

    def remove(self, value, _sa_initiator=None):
        self._remove_event(value, _sa_initiator)
        list.remove(self, value)

    def _append_event(self, value, _sa_initiator=None):
        adapter = collection_adapter(self.parent)
        return adapter.fire_append_event(value, _sa_initiator)

    def _remove_event(self, value, _sa_initiator=None):
        adapter = collection_adapter(self.parent)
        return adapter.fire_remove_event(value, _sa_initiator)

    def __delitem__(self, index):
        val = self[index]
        self._remove_event(val)
        list.__delitem__(self, index)

    def __setitem__(self, index, value):
        existing = self[index]
        if existing is not value:
            self._remove_event(existing)
            self._append_event(value)
            list.__setitem__(self, index, value)

class AssociationGBK(_AssociationCollection):
    def __init__(self, lazy_collection, creator, value_attr, parent):
        getter, setter = parent._default_getset(parent.collection_class)
        super(AssociationGBK, self).__init__(
                lazy_collection, creator, getter, setter, parent)

    def _create(self, key, value):
        return self.creator(key, value)

    def _get(self, object):
        return self.getter(object)

    def _set(self, object, key, value):
        return self.setter(object, key, value)

    def __getitem__(self, key):
        return AssociationGBKList(self, key)

    def __setitem__(self, key, value):
        self.col[key] = [self._create(key, item) for item in value]

    def add(self, key, item):
        self.col.add(self._create(key, item))

    def remove(self, key, item):
        for i, val in enumerate(self[key]):
            if val == item:
                self.col.remove(self.col[key][i])
                return
        raise ValueError("value not in list")

    def items(self):
        return ((key, [self._get(item) for item in self.col[key]])
                for key in self.col)

    def update(self, kw):
        for key, value in kw.items():
            self[key] = value

    def clear(self):
        self.col.clear()

    def copy(self):
        return dict(self.items())

    def __repr__(self):
        return repr(dict(self.items()))

class AssociationGBKList(_AssociationList):
    def __init__(self, parent, key):
        self.parent = parent
        self.key = key

    def __iter__(self):
        return (self.parent._get(item) for item in self.parent.col[self.key])

    def append(self, item):
        self.parent.add(self.key, item)

    def remove(self, item):
        self.parent.remove(self.key, item)

    def __getitem__(self, index):
        return self.parent._get(self.parent.col[self.key][index])

    def __setitem__(self, index, value):
        self.parent.col[self.key][index] = self.parent._create(self.key, value)

    def __delitem__(self, index):
        del self.parent.col[self.key][index]

# ------------------------------------------------------------------------------
Base = declarative_base()

class Person(Base):
    __tablename__ = 'person'

    id = Column(Integer, primary_key=True)

    _addresses_by_role = relationship("PersonToAddress",
                            cascade="all, delete-orphan",
                            collection_class=
                            lambda: GroupByKeyCollection(
                                        keyfunc=lambda item: item.role
                                    )
                        )
    addresses_by_role = association_proxy(
                            "_addresses_by_role",
                            "address",
                            proxy_factory=AssociationGBK,
                            creator=lambda k, v: PersonToAddress(role=k, 
address=v))

class PersonToAddress(Base):
    __tablename__ = 'person_to_address'

    id = Column(Integer, primary_key=True)
    person_id = Column(Integer, ForeignKey('person.id'))
    address_id = Column(Integer, ForeignKey('address.id'))
    role = Column(String)

    person = relationship("Person", backref="p2as")
    address = relationship("Address")

class Address(Base):
    __tablename__ = 'address'

    id = Column(Integer, primary_key=True)
    name = Column(String)


import unittest

class TestPA(unittest.TestCase):
    def setUp(self):
        self.e = create_engine("sqlite://", echo=True)
        Base.metadata.create_all(self.e)

        self.sess = Session(self.e)

    def tearDown(self):
        self.sess.close()
        self.e.dispose()

    def _assertAssociated(self, p1, a1, p2a):
        # custom collection
        self.assertEquals(p1._addresses_by_role["home"], [p2a])

        # association proxy
        self.assertEquals(p1.addresses_by_role["home"], [a1])

        # direct collection
        self.assertEquals(p1.p2as, [p2a])

        # foreign keys set correctly
        self.assertEquals(p2a.person_id, p1.id)
        self.assertEquals(p2a.address_id, a1.id)

    def _assertDeassociated(self, p1, a1, p2a):
        # when deassociating p1 from a1, there's no more association.
        # we want p2a to be deleted:
        self.assertEquals(self.sess.query(PersonToAddress).count(), 0)

        # list is empty
        self.assertEquals(p1.addresses_by_role["home"], [])


        # direct collection empty
        self.assertEquals(p1.p2as, [])

    def _fixture(self):
        sess = self.sess
        p1 = Person()
        a1 = Address(name="Bucharest")
        sess.add(p1)

        return sess, p1, a1

    def test_associate_via_collection_add(self):
        sess, p1, a1 = self._fixture()

        p2a = PersonToAddress(address=a1, role="home")

        # associate using the collection
        p1._addresses_by_role.add(p2a)
        sess.commit()

        self._assertAssociated(p1, a1, p2a)

    def test_associate_via_scalar_set(self):
        sess, p1, a1 = self._fixture()

        # here, p2a already refers to a1/p1, the "_addresses_by_role"
        # will be set up when it loads after a commit
        p2a = PersonToAddress(address=a1, person=p1, role="home")

        sess.commit()

        self._assertAssociated(p1, a1, p2a)

    def test_deassociate_via_collection_delindex(self):
        sess, p1, a1 = self._fixture()

        p1.addresses_by_role.add("home", a1)

        sess.commit()

        p2a = p1._addresses_by_role["home"][0]
        self._assertAssociated(p1, a1, p2a)

        # now deassociate

        del p1._addresses_by_role["home"][0]
        sess.commit()

        self._assertDeassociated(p1, a1, p2a)

    def test_associate_via_prox_add(self):
        sess, p1, a1 = self._fixture()

        # associate using the association proxy
        p1.addresses_by_role.add("home", a1)

        sess.commit()

        p2a = p1._addresses_by_role["home"][0]
        self._assertAssociated(p1, a1, p2a)

    def test_deassociate_via_prox_remove(self):
        sess, p1, a1 = self._fixture()

        p1.addresses_by_role.add("home", a1)

        sess.commit()

        p2a = p1._addresses_by_role["home"][0]
        self._assertAssociated(p1, a1, p2a)

        # now deassociate

        p1.addresses_by_role.remove("home", a1)

        sess.commit()

        self._assertDeassociated(p1, a1, p2a)

    def test_deassociate_via_prox_delindex(self):
        sess, p1, a1 = self._fixture()

        p1.addresses_by_role.add("home", a1)

        sess.commit()

        p2a = p1._addresses_by_role["home"][0]
        self._assertAssociated(p1, a1, p2a)

        # now deassociate

        del p1.addresses_by_role["home"][0]

        sess.commit()

        self._assertDeassociated(p1, a1, p2a)






if __name__ == '__main__':
    unittest.main()

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to