On Sep 3, 2013, at 8:47 AM, Paul Balomiri <paulbalom...@gmail.com> wrote:

> I would like to install
> 
> event.listen(list, 'append', append_listener)
> event.listen(list, 'remove', rm_listener)
> 
> on those lists, such that the GroupByKeyCollection can modify added objects 
> according to the relationship it implements:
> * set the appropiate foreign key constraints
> * insert a removed object with it's new value for the key attribute after a 
> change (announced by append_listener)
> * reset the fks upon item removal.

using event.listen with GBK doesn't make sense.  events can only be used with 
specific target types, the "remove" "append" events only apply to an 
ORM-produced InstrumentedAttribute, such as Person._addresses_by_role here 
(note, we mean the class-bound attribute, not the collection on an instance).  
There is no need to use event.listen with the collection itself, as 
remove/append are produced originally by the add()/remove() methods on GBK 
itself; any extra logic which should take place would be invoked directly from 
there (and in fact my original example fails to fire off the event with 
remove()).

Additionally, all the usage of prepare_instrumentation() etc. should not be 
necessary, that's all internal stuff which is called automatically.

As mentioned before, the behavior of this collection is completely outside the 
realm of a "normal" collection so it needs to implement the append/remove 
events directly, which isn't something a new user to SQLAlchemy would typically 
be able to handle without a much deeper understanding of how the attribute 
system works.

I've implemented your test case as below as well as some other variants in 
association with the original code I gave you - for the "remove" case I've 
added the necessary code to the custom collection. All foreign key constraints 
are set correctly as a function of the ORM's normal operation, and as far as 
"reset", when an association between Person and Address is removed, we want to 
just delete the association so cascade is used for that.   I'm not sure what 
"insert a removed object with it's new value for the key attribute after a 
change" means; add a test to the TestPA class illustrating the behavior you 
want and I'll add it.

from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
import collections
from sqlalchemy.orm.collections import collection, collection_adapter
from sqlalchemy.ext.associationproxy import association_proxy, 
_AssociationCollection
Base = declarative_base()

class GroupByKeyCollection(collections.defaultdict):
    def __init__(self, keyfunc):
        super(GroupByKeyCollection, self).__init__(list)
        self.keyfunc = keyfunc

    @collection.appender
    def add(self, value, _sa_initiator=None):
        key = self.keyfunc(value)
        self[key].append(value)

    @collection.remover
    def remove(self, value, _sa_initiator=None):
        key = self.keyfunc(value)
        adapter = collection_adapter(self)
        adapter.fire_remove_event(value, None)
        self[key].remove(value)

    @collection.internally_instrumented
    def __setitem__(self, key, value):
        adapter = collection_adapter(self)
        # the collection API usually provides these events transparently, but 
due to
        # the unusual structure, we pretty much have to fire them ourselves
        # for each item.
        for item in value:
            item = adapter.fire_append_event(item, None)
        collections.defaultdict.__setitem__(self, key, value)

    @collection.internally_instrumented
    def __delitem__(self, key):
        adapter = collection_adapter(self)
        for item in value:
            adapter.fire_remove_event(item, None)
        collections.defaultdict.__delitem__(self, key, value)

    @collection.iterator
    def iterate(self):
        for collection in self.values():
            for item in collection:
                yield item

    @collection.converter
    def _convert(self, target):
        for collection in target.values():
            for item in collection:
                yield item

    def update(self, k):
        raise NotImplementedError()


class AssociationGBK(_AssociationCollection):
    def __init__(self, lazy_collection, creator, value_attr, parent):
        getter, setter = parent._default_getset(parent.collection_class)
        super(AssociationGBK, self).__init__(
                lazy_collection, creator, getter, setter, parent)

    def _create(self, key, value):
        return self.creator(key, value)

    def _get(self, object):
        return self.getter(object)

    def _set(self, object, key, value):
        return self.setter(object, key, value)

    def __getitem__(self, key):
        return [self._get(item) for item in self.col[key]]

    def __setitem__(self, key, value):
        self.col[key] = [self._create(key, item) for item in value]

    def add(self, key, item):
        self.col.add(self._create(key, item))

    def remove(self, key, item):
        for i, val in enumerate(self[key]):
            if val == item:
                self.col.remove(self.col[key][i])
                return
        raise ValueError("value not in list")

    def items(self):
        return ((key, [self._get(item) for item in self.col[key]])
                for key in self.col)

    def update(self, kw):
        for key, value in kw.items():
            self[key] = value

    def clear(self):
        self.col.clear()

    def copy(self):
        return dict(self.items())

    def __repr__(self):
        return repr(dict(self.items()))

# ------------------------------------------------------------------------------
Base = declarative_base()

class Person(Base):
    __tablename__ = 'person'

    id = Column(Integer, primary_key=True)

    _addresses_by_role = relationship("PersonToAddress",
                            cascade="all, delete-orphan",
                            collection_class=
                            lambda: GroupByKeyCollection(
                                        keyfunc=lambda item: item.role
                                    )
                        )
    addresses_by_role = association_proxy(
                            "_addresses_by_role",
                            "address",
                            proxy_factory=AssociationGBK,
                            creator=lambda k, v: PersonToAddress(role=k, 
address=v))

class PersonToAddress(Base):
    __tablename__ = 'person_to_address'

    id = Column(Integer, primary_key=True)
    person_id = Column(Integer, ForeignKey('person.id'))
    address_id = Column(Integer, ForeignKey('address.id'))
    role = Column(String)

    person = relationship("Person", backref="p2as")
    address = relationship("Address")

class Address(Base):
    __tablename__ = 'address'

    id = Column(Integer, primary_key=True)
    name = Column(String)


import unittest

class TestPA(unittest.TestCase):
    def setUp(self):
        self.e = create_engine("sqlite://", echo=True)
        Base.metadata.create_all(self.e)

        self.sess = Session(self.e)

    def tearDown(self):
        self.sess.close()
        self.e.dispose()

    def _assertAssociated(self, p1, a1, p2a):
        # custom collection
        self.assertEquals(p1._addresses_by_role["home"], [p2a])

        # association proxy
        self.assertEquals(p1.addresses_by_role["home"], [a1])

        # direct collection
        self.assertEquals(p1.p2as, [p2a])

        # foreign keys set correctly
        self.assertEquals(p2a.person_id, p1.id)
        self.assertEquals(p2a.address_id, a1.id)

    def _assertDeassociated(self, p1, a1, p2a):
        # when deassociating p1 from a1, there's no more association.
        # we want p2a to be deleted:
        self.assertEquals(self.sess.query(PersonToAddress).count(), 0)

        # list is empty
        self.assertEquals(p1.addresses_by_role["home"], [])


        # direct collection empty
        self.assertEquals(p1.p2as, [])

    def test_associate_via_collection(self):
        sess = self.sess
        p1 = Person()
        a1 = Address(name="Bucharest")

        p2a = PersonToAddress(address=a1, role="home")

        sess.add(p1)

        # associate using the collection
        p1._addresses_by_role.add(p2a)

        sess.commit()

        self._assertAssociated(p1, a1, p2a)

    def test_associate_via_scalar(self):
        sess = self.sess
        p1 = Person()
        a1 = Address(name="Bucharest")

        # here, p2a already refers to a1/p1, the "_addresses_by_role"
        # will be set up when it loads after a commit
        p2a = PersonToAddress(address=a1, person=p1, role="home")

        sess.add(p1)

        sess.commit()

        self._assertAssociated(p1, a1, p2a)

    def test_associate_via_association_proxy(self):
        sess = self.sess
        p1 = Person()
        a1 = Address(name="Bucharest")

        sess.add(p1)

        # associate using the association proxy
        p1.addresses_by_role.add("home", a1)

        sess.commit()

        p2a = p1._addresses_by_role["home"][0]
        self._assertAssociated(p1, a1, p2a)

    def test_deassociate(self):
        sess = self.sess
        p1 = Person()
        a1 = Address(name="Bucharest")

        sess.add(p1)

        p1.addresses_by_role.add("home", a1)

        sess.commit()

        p2a = p1._addresses_by_role["home"][0]
        self._assertAssociated(p1, a1, p2a)

        # now deassociate

        p1.addresses_by_role.remove("home", a1)

        sess.commit()

        self._assertDeassociated(p1, a1, p2a)



if __name__ == '__main__':
    unittest.main()



Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to