Seems that when multiple mapper extensions are used, only the first is run. Testcase attached
--~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To post to this group, send email to sqlalchemy@googlegroups.com To unsubscribe from this group, send email to [EMAIL PROTECTED] For more options, visit this group at http://groups.google.com/group/sqlalchemy?hl=en -~----------~----~----~----~------~----~------~--~---
from sqlalchemy import * from sqlalchemy.orm import * import datetime import time class TimestampExtension(MapperExtension): def _decorate_instance(self, instance): print 'timestamp.exec' if hasattr(instance, 'audit_ts'): instance.audit_ts = datetime.datetime.now() def before_insert(self, mapper, connection, instance): self._decorate_instance(instance) def before_update(self, mapper, connection, instance): self._decorate_instance(instance) class InstanceEvent(MapperExtension): def before_insert(self, mapper, connection, instance): print 'instanceevent.oninsert.exec' if hasattr(instance, 'oninsert'): instance.oninsert() return EXT_CONTINUE def before_update(self, mapper, connection, instance): print 'instanceevent.onupdate.exec' if hasattr(instance, 'onupdate'): instance.onupdate() return EXT_CONTINUE def populate_instance(self, mapper, context, row, instance, instancekey, isnew): print 'instanceevent.onload.exec' if isnew and hasattr(instance, 'onload'): mapper.populate_instance(context, instance, row, instancekey=instancekey, isnew=isnew) instance.onload() return None return EXT_CONTINUE engine = create_engine('sqlite:///:memory:', echo=True) #engine = create_engine('sqlite:///:memory:') meta = MetaData(bind=engine) a = Table('a', meta, Column('id', INT, nullable=False, primary_key=True), Column('nm', VARCHAR(8)), Column('typ', VARCHAR(8)), Column('audit_ts', DATETIME) ) meta.create_all() class K(object): def __init__(self, **kw): for k,v in kw.items(): setattr(self, k,v) class A(K): def oninsert(self): self.typ = 'new' def onupdate(self): self.typ = 'mod' mapper(A, a, extension = [TimestampExtension(), InstanceEvent()]) S = create_session() x = A(nm='one') S.save(x) S.flush() assert x.audit_ts is not None assert x.typ == 'new' old_ts = x.audit_ts time.sleep(1) # ensure timestamps are different x.nm = 'one-1' S.flush() assert x.typ == 'mod' assert x.audit_ts > old_ts