On 05/11/2016 03:39 AM, Tim-Christian Mundt wrote:
Ok, a complete MCVE looks like that:


from sqlalchemy import Column, ForeignKey, Integer, String, create_engine, event
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref, relationship, sessionmaker

Base = declarative_base()


class Entity(Base):
     __tablename__ = 'entity'
     id = Column(Integer, primary_key=True)
     tags = relationship('TagAssociation', backref='entity',
                         cascade="all, delete, delete-orphan")


class Vocabulary(Base):
     __tablename__ = 'vocabulary'
     id = Column(Integer, primary_key=True)
     name = Column(String(255), nullable=False, unique=True)


class Tag(Base):
     __tablename__ = 'tag'
     id = Column(Integer, primary_key=True)
     vocabularyId = Column(Integer, ForeignKey(Vocabulary.id), nullable=False)
     vocabulary = relationship(Vocabulary, lazy='joined',
                               backref=backref('tags', cascade='all, 
delete-orphan'))
     name = Column(String(50), nullable=False)


class TagAssociation(Base):
     __tablename__ = 'tag_association'
     entityId = Column(Integer, ForeignKey(Entity.id), primary_key=True)
     tagId = Column(Integer, ForeignKey(Tag.id), primary_key=True)
     tag = relationship(Tag, lazy='joined')


@event.listens_for(TagAssociation, 'before_insert')
def validateOnlyOneTagPerVocabulary(mapper, connection, assoc):
     vocabulary = assoc.tag.vocabulary
     existingAssocs = session.query(TagAssociation).join(Tag) \
         .filter(Tag.vocabulary == vocabulary, TagAssociation.entity == 
assoc.entity)
     newAssocs = [assoc for assoc in assoc.entity.tags
                  if assoc in session.new and assoc.tag.vocabulary == 
vocabulary]
     if len(newAssocs) > 1 or (len(newAssocs) + existingAssocs.count()) > 1:
         raise ValueError('Can only assign one tag of vocabulary "%s".' % 
vocabulary.name)


engine = create_engine('sqlite:///:memory:', echo=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

vocabulary = Vocabulary(name='Vocabulary')
tag1 = Tag(vocabulary=vocabulary, name='Tag1')
tag2 = Tag(vocabulary=vocabulary, name='Tag2')

vocabulary2 = Vocabulary(name='Vocabulary2')
tag3 = Tag(vocabulary=vocabulary2, name='Tag3')

entity = Entity()

session.add(vocabulary)
session.add(vocabulary2)
session.add(entity)
session.commit()

TagAssociation(entity=entity, tag=tag1)  # first tag
session.commit()
TagAssociation(entity=entity, tag=tag3)  # tag from a different vocabulary, no 
problem
session.commit()
try:
     TagAssociation(entity=entity, tag=tag2)  # a second tag from the same 
vocabulary
     session.commit()
except ValueError:
     # this is expected
     session.rollback()
entity.tags = [TagAssociation(entity=entity, tag=tag2)]
session.commit()  # here no exception is expected


Which event or place and method are best to solve that? With „inspect“ I 
couldn’t find anything that would help me.

OK well because you're validating only when an entry is essentially added to the DB, and you're comparing to the current DB contents, you're not able to get a view of what the ultimate state of the collection would be. before_insert is not a good choice of event here for this reason, although you could make the example here work by looking in the "deleted" items and removing that from the "newAssocs" list. inspect() would be used like:

items_we_shouldnt_count = inspect(assoc.entity).attrs.tags.history.deleted

that collection would give you the TagAssociation(tag=tag1) and TagAssociation(tag=tag3) which you'd not count in your check.

This kind of validation is a little difficult but the best place is to get at the collection exactly once, in the before_flush() event:

@event.listens_for(Session, "before_flush")
def _validate_tags(session, ctx, instances):
    for obj in session.new.union(session.dirty):
        if isinstance(obj, Entity):
            vocabs = set()
            for v in (ta.tag.vocabulary for ta in obj.tags):
                if v in vocabs:
                    raise ValueError(
                        'Can only assign one tag of vocabulary "%s".' %
                        v.name)
                else:
                    vocabs.add(v)

Note that above we don't need to do any kind of SQL except for whatever isn't loaded on those collections, so it will run a lot faster than running a whole new query on every INSERT. It also is checking the collection against what we are ultimately going to make it, that is taking into account removals as well, instead of only looking in the database.

additionally, your test is illustrating a race condition that will trigger this assertion, which is that your assignment at the end triggers autoflush; this is because you are creating a TagAssociation object already attached to its parent Entity, which already appends it to the collection without first removing the othe ritems, and since the collection was expired a lazyload is also trying to flush it. So the bottom of the example needs to read either:

with session.no_autoflush:
    entity.tags = [TagAssociation(entity=entity, tag=tag2)]
session.commit()  # here no exception is expected

or:

entity.tags = [TagAssociation(tag=tag2)]
session.commit()  # here no exception is expected



see attached.







you'd need to show me how to illustrate the .tags collection storing two 
TagAssociation objects when you've explicitly set the collection to store a 
single TagAssociation
I don’t get it, where do I set that?


Am 05.05.2016 um 23:26 schrieb Mike Bayer <mike...@zzzcomputing.com>:



On 05/03/2016 06:28 PM, Tim-Christian Mundt wrote:
I have entities which I’d like to tag with tags from a vocabulary. The 
associations between entities and tags are implemented with „manual M:N“, 
because they carry more information.

class Entity(Base):
     id = Column(Integer, primary_key=True)
     tags = relationship('TagAssociation', backref='entity',
                         cascade="all, delete, delete-orphan")

class Vocabulary(Base):
     id = Column(Integer, primary_key=True)
     name = Column(String(255), nullable=False, unique=True)

class Tag(Base):
     id = Column(Integer, primary_key=True)
     vocabularyId = Column(Integer, ForeignKey(Vocabulary.id), nullable=False)
     vocabulary = relationship(Vocabulary, lazy='joined',
                               backref=backref('tags', cascade='all, 
delete-orphan'))
     name = Column(String(50), nullable=False)

class TagAssociation(Base):
     entityId = Column(Integer, ForeignKey(Entity.id), primary_key=True)
     tagId = Column(Integer, ForeignKey(Tag.id), primary_key=True)
     tag = relationship(Tag, lazy='joined‘)


Now I want to make sure that I don’t have more than one tag from a single 
vocabulary on an entity. Currently I’m using
the before_insert event on TagAssociation which works well except in this case:

entity = Entity()
entity.tags.append(TagAssociation(entity=entity, tag=some_tag))
session.add(entity)
session.flush()
entity.tags = [TagAssociation(entity=entity, 
tag=some_other_tag_from_same_vocabulary)]
session.flush()

When the validation function is called during the second flush, the collection 
(assoc_to_be_inserted.entity.tags)
has both tags in it (which I don’t understand).

This is not enough detail for me to reproduce that, you'd need to show me how 
to illustrate the .tags collection storing two TagAssociation objects when 
you've explicitly set the collection to store a single TagAssociation.   That's 
not anything that reproduces here.



I couldn’t find a way to detect if one of them is going to be deleted.

if a collection is assigned to a specific value, then the items that are no longer 
present in that collection are added to a list called "deleted".  That can be 
seen like this:

from sqlalchemy import inspect

inspect(my_entity).attrs.tags.history


Hence, the validation fails, although the final result would be correct.

Is there any event I can use to reliably implement that validation?

There's not enough detail here for me to know exactly how you're doing this 
validation and under what scenario you're seeing something unexpected.   The 
best format is to send an MCVE (http://stackoverflow.com/help/mcve).







Thanks,
Tim


--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.


--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
from sqlalchemy import Column, ForeignKey, Integer, String, create_engine, event
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref, relationship, sessionmaker


Base = declarative_base()


class Entity(Base):
    __tablename__ = 'entity'
    id = Column(Integer, primary_key=True)
    tags = relationship('TagAssociation', backref='entity',
                        cascade="all, delete, delete-orphan")


class Vocabulary(Base):
    __tablename__ = 'vocabulary'
    id = Column(Integer, primary_key=True)
    name = Column(String(255), nullable=False, unique=True)


class Tag(Base):
    __tablename__ = 'tag'
    id = Column(Integer, primary_key=True)
    vocabularyId = Column(Integer, ForeignKey(Vocabulary.id), nullable=False)
    vocabulary = relationship(Vocabulary, lazy='joined',
                              backref=backref('tags', cascade='all, delete-orphan'))
    name = Column(String(50), nullable=False)


class TagAssociation(Base):
    __tablename__ = 'tag_association'
    entityId = Column(Integer, ForeignKey(Entity.id), primary_key=True)
    tagId = Column(Integer, ForeignKey(Tag.id), primary_key=True)
    tag = relationship(Tag, lazy='joined')

engine = create_engine('sqlite:///:memory:', echo=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)


@event.listens_for(Session, "before_flush")
def _validate_tags(session, ctx, instances):
    for obj in session.new.union(session.dirty):
        if isinstance(obj, Entity):
            vocabs = set()
            for v in (ta.tag.vocabulary for ta in obj.tags):
                if v in vocabs:
                    raise ValueError(
                        'Can only assign one tag of vocabulary "%s".' %
                        v.name)
                else:
                    vocabs.add(v)



session = Session()

vocabulary = Vocabulary(name='Vocabulary')
tag1 = Tag(vocabulary=vocabulary, name='Tag1')
tag2 = Tag(vocabulary=vocabulary, name='Tag2')

vocabulary2 = Vocabulary(name='Vocabulary2')
tag3 = Tag(vocabulary=vocabulary2, name='Tag3')

entity = Entity()

session.add(vocabulary)
session.add(vocabulary2)
session.add(entity)
session.commit()

TagAssociation(entity=entity, tag=tag1)  # first tag
session.commit()
TagAssociation(entity=entity, tag=tag3)  # tag from a different vocabulary, no problem
session.commit()
try:
    TagAssociation(entity=entity, tag=tag2)  # a second tag from the same vocabulary
    session.commit()
    assert False  # ensure exception is raised
except ValueError:
    # this is expected
    session.rollback()
entity.tags = [TagAssociation(tag=tag2)]
session.commit()  # here no exception is expected

Reply via email to