given that it looks like a new version for you means an UPDATE of the old row and an INSERT of the new, here is that, which is again basically what we see at https://docs.sqlalchemy.org/en/latest/_modules/examples/versioned_rows/versioned_rows.html with some extra steps to emit the UPDATE for the old row. this recipe combines both the versioned event handler and the query handler.
from sqlalchemy import ( create_engine, Integer, String, event, ForeignKey, Column, DateTime, inspect, func, select, cast ) from sqlalchemy.orm import ( make_transient, Session, relationship, attributes, backref, make_transient_to_detached, Query, selectinload ) from sqlalchemy.ext.declarative import declarative_base import datetime import time Base = declarative_base() # this will be the current time as the test runs now = None class VersionedStartEnd(object): def __init__(self, **kw): # reduce some verbosity when we make a new object kw.setdefault("start", now - datetime.timedelta(days=3)) kw.setdefault("end", now + datetime.timedelta(days=3)) super(VersionedStartEnd, self).__init__(**kw) def new_version(self, session): # our current identity key, which will be used on the "old" # version of us to emit an UPDATE. this is just for assertion purposes old_identity_key = inspect(self).key # make sure self.start / self.end are not expired self.id, self.start, self.end # turn us into an INSERT make_transient(self) # make the "old" version of us, which we will turn into an # UPDATE old_copy_of_us = self.__class__( id=self.id, start=self.start, end=self.end) # turn old_copy_of_us into an UPDATE make_transient_to_detached(old_copy_of_us) # the "old" object has our old identity key (that we no longer have) assert inspect(old_copy_of_us).key == old_identity_key # now put it back in the session session.add(old_copy_of_us) # now update the 'end' - SQLAlchemy sees this as a PK switch old_copy_of_us.end = now # fun fact! the new_version() routine is *not* called for # old_copy_of_us! because we are already in the before_flush() hook! # this surprised even me. I was thinking we had to guard against # it. Still might be a good idea to do so. self.start = now self.end = now + datetime.timedelta(days=2) @event.listens_for(Session, "before_flush") def before_flush(session, flush_context, instances): for instance in session.dirty: if not isinstance(instance, VersionedStartEnd): continue if not session.is_modified(instance, passive=True): continue if not attributes.instance_state(instance).has_identity: continue # make it transient instance.new_version(session) # re-add session.add(instance) @event.listens_for(Query, "before_compile", retval=True) def before_compile(query): """ensure all queries for VersionedStartEnd include criteria """ for ent in query.column_descriptions: entity = ent['entity'] if entity is None: continue insp = inspect(ent['entity']) mapper = getattr(insp, 'mapper', None) if mapper and issubclass(mapper.class_, VersionedStartEnd): query = query.enable_assertions(False).filter( func.now().between(ent['entity'].start, ent['entity'].end) ) return query class Parent(VersionedStartEnd, Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True, autoincrement=True) start = Column(DateTime, primary_key=True) end = Column(DateTime, primary_key=True) data = Column(String) child_n = Column(Integer) child = relationship( "Child", primaryjoin=( "Child.id == foreign(Parent.child_n)" ), # note the primaryjoin can also be: # # "and_(Child.id == foreign(Parent.child_n), " # "func.now().between(Child.start, Child.end))" # # however the before_compile() above will take care of this for us in # all cases except for joinedload. You *can* use the above primaryjoin # as well, it just means the criteria will be present twice for most # parent->child load operations # uselist=False, backref=backref('parent', uselist=False) ) class Child(VersionedStartEnd, Base): __tablename__ = 'child' id = Column(Integer, primary_key=True, autoincrement=True) start = Column(DateTime, primary_key=True) end = Column(DateTime, primary_key=True) data = Column(String) def new_version(self, session): # expire parent's reference to us session.expire(self.parent, ['child']) # create new version VersionedStartEnd.new_version(self, session) # re-add ourselves to the parent self.parent.child = self times = [] def time_passes(s): """keep track of timestamps in terms of the database and allow time to pass between steps.""" # close the transaction, if any, since PG time doesn't increment in the # transaction s.commit() # get "now" in terms of the DB so we can keep the ranges low and # still have our assertions pass if times: time.sleep(1) times.append(s.scalar(select([cast(func.now(), DateTime)]))) if len(times) > 1: assert times[-1] > times[-2] return times[-1] e = create_engine("postgresql://scott:tiger@localhost/test", echo='debug') Base.metadata.drop_all(e) Base.metadata.create_all(e) s = Session(e) now = time_passes(s) c1 = Child(data='child 1') p1 = Parent(data='c1', child=c1) s.add(p1) s.commit() # assert raw DB data assert s.query(Parent.__table__).all() == [ (1, times[0] - datetime.timedelta(days=3), times[0] + datetime.timedelta(days=3), 'c1', 1) ] assert s.query(Child.__table__).all() == [ (1, times[0] - datetime.timedelta(days=3), times[0] + datetime.timedelta(days=3), 'child 1') ] now = time_passes(s) p1_check = s.query(Parent).first() assert p1_check is p1 assert p1_check.child is c1 p1.child.data = 'elvis presley' s.commit() p2_check = s.query(Parent).first() assert p2_check is p1_check c2_check = p2_check.child # same object assert p2_check.child is c1 # new data assert c1.data == 'elvis presley' # new end time assert c1.end == now + datetime.timedelta(days=2) # assert raw DB data assert s.query(Parent.__table__).all() == [ (1, times[0] - datetime.timedelta(days=3), times[0] + datetime.timedelta(days=3), 'c1', 1) ] assert s.query(Child.__table__).order_by(Child.end).all() == [ (1, times[0] - datetime.timedelta(days=3), times[1], 'child 1'), (1, times[1], times[1] + datetime.timedelta(days=2), 'elvis presley') ] now = time_passes(s) p1.data = 'c2 elvis presley' s.commit() # assert raw DB data. now there are two parent rows. assert s.query(Parent.__table__).order_by(Parent.end).all() == [ (1, times[0] - datetime.timedelta(days=3), times[2], 'c1', 1), (1, times[2], times[2] + datetime.timedelta(days=2), 'c2 elvis presley', 1) ] assert s.query(Child.__table__).order_by(Child.end).all() == [ (1, times[0] - datetime.timedelta(days=3), times[1], 'child 1'), (1, times[1], times[1] + datetime.timedelta(days=2), 'elvis presley') ] # add some more rows to test that these aren't coming back for # queries s.add(Parent(data='unrelated', child=Child(data='unrelated'))) s.commit() # Query only knows about one parent for id=1 p3_check = s.query(Parent).filter_by(id=1).one() assert p3_check is p1 assert p3_check.child is c1 # and one child. c3_check = s.query(Child).filter(Child.parent == p3_check).one() assert c3_check is c1 # one child one parent.... c3_check = s.query(Child).join(Parent.child).filter( Parent.id == p3_check.id).one() # try selectinload eager loading across multiple parents for parent in s.query(Parent).options(selectinload(Parent.child)): if parent.data == 'unrelated': assert parent.child.data == 'unrelated' elif parent.data == 'c2 elvis presley': assert parent.child.data == 'elvis presley' else: assert False On Thu, Dec 6, 2018 at 1:40 PM Mike Bayer <mike...@zzzcomputing.com> wrote: > > On Thu, Dec 6, 2018 at 3:53 AM Stanislav Lobanov <n10101...@gmail.com> wrote: > > > > Example business case is: > > > > Parent and child are added to the system (current date is 2018-01-01) > > > > Parent > > id | start | end | data | child_id > > 1 | 2018-01-01 | 2018-01-11 | c1 | 1 # just pointer > > to child with some id (now points to first child record) > > > > Child > > id | start | end | data | > > 1 | 2018-01-01 | 2018-01-11 | Elvis P. | # this is current version > > > > Then on 2018-01-02 children's name is changed from "Elvis P." to "Elvis > > Presley". That change creates second version of child with ID=1: > > > > Parent > > id | start | end | data | child_id > > 1 | 2018-01-01 | 2018-01-11 | c1 | 1 # just pointer > > to child with some id (now logically points to second child version) > > > > Child # please notice that id is not changed as this is same child > > id | start | end | data | > > 1 | 2018-01-01 | 2018-01-02 | Elvis P. | # this is not current > > (latest) version anymore > > 1 | 2018-01-02 | 2018-01-11 | Elvis Presley | # but this is > > > > > > See? Parent does not care about what changes were made to child, it is just > > cares to relate to the latest version of child, so there are no composite > > foreign key to child table (this restriction comes from legacy system and i > > can not add "child_start" and "child_end" columns to form full composite FK > > to child table). > > > > When child is updated (effectively UPDATE is converted to INSERT) then only > > child table is modified, so there are no cascades to parent, because parent > > just targets to the row where Child.id == 1. > > from the data above, this is not strictly the case. the row with > "Elvis P.", the "end" date has been UPDATED from 2018-01-11 to > 2018-01-02. is that correct? So you need an UPDATE *and* an INSERT. > Am seeing if I can make that happen. > > > > > > And this is my problem, because i do not know to to make such "implicit" > > relationships in sqlalchemy. By implicit i mean a situation when relation > > is made by child.id AND latest date range for related object. > > > > Currently we would be using following query to retreive information about > > parent and it's child (in it's latest state): > > > > select * from parent, child > > where parent.child_id=child.id > > and now() between parent.start and parent.end -- gives us latest parent > > state > > and now() between child.start and child.end -- gives us latest child state > > > > As you can see such a queries is hard to write, it is repetitive and error > > prone. > > > > Also, i'm using versioned approach in one of the projects (using your > > example). Everything related to data consistency, data integrity, data > > querying must be done by hands, because i did not found a way to provide > > cascades and correct relationship behaviour without full composite FK. > > > > Thanks! > > > > > > вторник, 4 декабря 2018 г., 15:36:42 UTC+3 пользователь Stanislav Lobanov > > написал: > >> > >> Hello. > >> > >> I have a table with schema: > >> > >> name: users > >> fields: id int, name text, start datetime, end datetime > >> primary key: id, start, end > >> > >> This is kind of a historical table, where each row defines separate > >> "historical version" of an object. There are a single business User entity > >> (model) with possibly many historical versions. > >> > >> Such table structure makes it very hard to define relationships and work > >> with related objects. Also it is hard to work with "current" version of > >> User entity, because to retreive it we need to query it with "now() > >> between start and end" constraint. > >> > >> So i thought that maybe i can create a view for that table that will hold > >> only current versions and map that view onto User entity to hide all > >> historical complexities and compound PK from sqlalchemy. > >> > >> The question: is it possible to implement a mapping that will read from > >> view but write into real table? > >> > >> For example, view can have fields id (pk) and name. > >> > >> I know that there are great examples of versioning with sqlalchemy but i > >> want to hide non-functional implementation details from my business > >> code/entities with view. > >> > >> Thanks! > > > > -- > > SQLAlchemy - > > The Python SQL Toolkit and Object Relational Mapper > > > > http://www.sqlalchemy.org/ > > > > To post example code, please provide an MCVE: Minimal, Complete, and > > Verifiable Example. See http://stackoverflow.com/help/mcve for a full > > description. > > --- > > You received this message because you are subscribed to the Google Groups > > "sqlalchemy" group. > > To unsubscribe from this group and stop receiving emails from it, send an > > email to sqlalchemy+unsubscr...@googlegroups.com. > > To post to this group, send email to sqlalchemy@googlegroups.com. > > Visit this group at https://groups.google.com/group/sqlalchemy. > > For more options, visit https://groups.google.com/d/optout. -- SQLAlchemy - The Python SQL Toolkit and Object Relational Mapper http://www.sqlalchemy.org/ To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description. --- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.