On 05/11/2016 03:39 AM, Tim-Christian Mundt wrote:
Ok, a complete MCVE looks like that:
from sqlalchemy import Column, ForeignKey, Integer, String, create_engine, event
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref, relationship, sessionmaker
Base = declarative_base()
class Entity(Base):
__tablename__ = 'entity'
id = Column(Integer, primary_key=True)
tags = relationship('TagAssociation', backref='entity',
cascade="all, delete, delete-orphan")
class Vocabulary(Base):
__tablename__ = 'vocabulary'
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False, unique=True)
class Tag(Base):
__tablename__ = 'tag'
id = Column(Integer, primary_key=True)
vocabularyId = Column(Integer, ForeignKey(Vocabulary.id), nullable=False)
vocabulary = relationship(Vocabulary, lazy='joined',
backref=backref('tags', cascade='all,
delete-orphan'))
name = Column(String(50), nullable=False)
class TagAssociation(Base):
__tablename__ = 'tag_association'
entityId = Column(Integer, ForeignKey(Entity.id), primary_key=True)
tagId = Column(Integer, ForeignKey(Tag.id), primary_key=True)
tag = relationship(Tag, lazy='joined')
@event.listens_for(TagAssociation, 'before_insert')
def validateOnlyOneTagPerVocabulary(mapper, connection, assoc):
vocabulary = assoc.tag.vocabulary
existingAssocs = session.query(TagAssociation).join(Tag) \
.filter(Tag.vocabulary == vocabulary, TagAssociation.entity ==
assoc.entity)
newAssocs = [assoc for assoc in assoc.entity.tags
if assoc in session.new and assoc.tag.vocabulary ==
vocabulary]
if len(newAssocs) > 1 or (len(newAssocs) + existingAssocs.count()) > 1:
raise ValueError('Can only assign one tag of vocabulary "%s".' %
vocabulary.name)
engine = create_engine('sqlite:///:memory:', echo=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
vocabulary = Vocabulary(name='Vocabulary')
tag1 = Tag(vocabulary=vocabulary, name='Tag1')
tag2 = Tag(vocabulary=vocabulary, name='Tag2')
vocabulary2 = Vocabulary(name='Vocabulary2')
tag3 = Tag(vocabulary=vocabulary2, name='Tag3')
entity = Entity()
session.add(vocabulary)
session.add(vocabulary2)
session.add(entity)
session.commit()
TagAssociation(entity=entity, tag=tag1) # first tag
session.commit()
TagAssociation(entity=entity, tag=tag3) # tag from a different vocabulary, no
problem
session.commit()
try:
TagAssociation(entity=entity, tag=tag2) # a second tag from the same
vocabulary
session.commit()
except ValueError:
# this is expected
session.rollback()
entity.tags = [TagAssociation(entity=entity, tag=tag2)]
session.commit() # here no exception is expected
Which event or place and method are best to solve that? With „inspect“ I
couldn’t find anything that would help me.
OK well because you're validating only when an entry is essentially
added to the DB, and you're comparing to the current DB contents, you're
not able to get a view of what the ultimate state of the collection
would be. before_insert is not a good choice of event here for this
reason, although you could make the example here work by looking in the
"deleted" items and removing that from the "newAssocs" list. inspect()
would be used like:
items_we_shouldnt_count =
inspect(assoc.entity).attrs.tags.history.deleted
that collection would give you the TagAssociation(tag=tag1) and
TagAssociation(tag=tag3) which you'd not count in your check.
This kind of validation is a little difficult but the best place is to
get at the collection exactly once, in the before_flush() event:
@event.listens_for(Session, "before_flush")
def _validate_tags(session, ctx, instances):
for obj in session.new.union(session.dirty):
if isinstance(obj, Entity):
vocabs = set()
for v in (ta.tag.vocabulary for ta in obj.tags):
if v in vocabs:
raise ValueError(
'Can only assign one tag of vocabulary "%s".' %
v.name)
else:
vocabs.add(v)
Note that above we don't need to do any kind of SQL except for whatever
isn't loaded on those collections, so it will run a lot faster than
running a whole new query on every INSERT. It also is checking the
collection against what we are ultimately going to make it, that is
taking into account removals as well, instead of only looking in the
database.
additionally, your test is illustrating a race condition that will
trigger this assertion, which is that your assignment at the end
triggers autoflush; this is because you are creating a TagAssociation
object already attached to its parent Entity, which already appends it
to the collection without first removing the othe ritems, and since the
collection was expired a lazyload is also trying to flush it. So the
bottom of the example needs to read either:
with session.no_autoflush:
entity.tags = [TagAssociation(entity=entity, tag=tag2)]
session.commit() # here no exception is expected
or:
entity.tags = [TagAssociation(tag=tag2)]
session.commit() # here no exception is expected
see attached.
you'd need to show me how to illustrate the .tags collection storing two
TagAssociation objects when you've explicitly set the collection to store a
single TagAssociation
I don’t get it, where do I set that?
Am 05.05.2016 um 23:26 schrieb Mike Bayer <mike...@zzzcomputing.com>:
On 05/03/2016 06:28 PM, Tim-Christian Mundt wrote:
I have entities which I’d like to tag with tags from a vocabulary. The
associations between entities and tags are implemented with „manual M:N“,
because they carry more information.
class Entity(Base):
id = Column(Integer, primary_key=True)
tags = relationship('TagAssociation', backref='entity',
cascade="all, delete, delete-orphan")
class Vocabulary(Base):
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False, unique=True)
class Tag(Base):
id = Column(Integer, primary_key=True)
vocabularyId = Column(Integer, ForeignKey(Vocabulary.id), nullable=False)
vocabulary = relationship(Vocabulary, lazy='joined',
backref=backref('tags', cascade='all,
delete-orphan'))
name = Column(String(50), nullable=False)
class TagAssociation(Base):
entityId = Column(Integer, ForeignKey(Entity.id), primary_key=True)
tagId = Column(Integer, ForeignKey(Tag.id), primary_key=True)
tag = relationship(Tag, lazy='joined‘)
Now I want to make sure that I don’t have more than one tag from a single
vocabulary on an entity. Currently I’m using
the before_insert event on TagAssociation which works well except in this case:
entity = Entity()
entity.tags.append(TagAssociation(entity=entity, tag=some_tag))
session.add(entity)
session.flush()
entity.tags = [TagAssociation(entity=entity,
tag=some_other_tag_from_same_vocabulary)]
session.flush()
When the validation function is called during the second flush, the collection
(assoc_to_be_inserted.entity.tags)
has both tags in it (which I don’t understand).
This is not enough detail for me to reproduce that, you'd need to show me how
to illustrate the .tags collection storing two TagAssociation objects when
you've explicitly set the collection to store a single TagAssociation. That's
not anything that reproduces here.
I couldn’t find a way to detect if one of them is going to be deleted.
if a collection is assigned to a specific value, then the items that are no longer
present in that collection are added to a list called "deleted". That can be
seen like this:
from sqlalchemy import inspect
inspect(my_entity).attrs.tags.history
Hence, the validation fails, although the final result would be correct.
Is there any event I can use to reliably implement that validation?
There's not enough detail here for me to know exactly how you're doing this
validation and under what scenario you're seeing something unexpected. The
best format is to send an MCVE (http://stackoverflow.com/help/mcve).
Thanks,
Tim
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
from sqlalchemy import Column, ForeignKey, Integer, String, create_engine, event
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import backref, relationship, sessionmaker
Base = declarative_base()
class Entity(Base):
__tablename__ = 'entity'
id = Column(Integer, primary_key=True)
tags = relationship('TagAssociation', backref='entity',
cascade="all, delete, delete-orphan")
class Vocabulary(Base):
__tablename__ = 'vocabulary'
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False, unique=True)
class Tag(Base):
__tablename__ = 'tag'
id = Column(Integer, primary_key=True)
vocabularyId = Column(Integer, ForeignKey(Vocabulary.id), nullable=False)
vocabulary = relationship(Vocabulary, lazy='joined',
backref=backref('tags', cascade='all, delete-orphan'))
name = Column(String(50), nullable=False)
class TagAssociation(Base):
__tablename__ = 'tag_association'
entityId = Column(Integer, ForeignKey(Entity.id), primary_key=True)
tagId = Column(Integer, ForeignKey(Tag.id), primary_key=True)
tag = relationship(Tag, lazy='joined')
engine = create_engine('sqlite:///:memory:', echo=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
@event.listens_for(Session, "before_flush")
def _validate_tags(session, ctx, instances):
for obj in session.new.union(session.dirty):
if isinstance(obj, Entity):
vocabs = set()
for v in (ta.tag.vocabulary for ta in obj.tags):
if v in vocabs:
raise ValueError(
'Can only assign one tag of vocabulary "%s".' %
v.name)
else:
vocabs.add(v)
session = Session()
vocabulary = Vocabulary(name='Vocabulary')
tag1 = Tag(vocabulary=vocabulary, name='Tag1')
tag2 = Tag(vocabulary=vocabulary, name='Tag2')
vocabulary2 = Vocabulary(name='Vocabulary2')
tag3 = Tag(vocabulary=vocabulary2, name='Tag3')
entity = Entity()
session.add(vocabulary)
session.add(vocabulary2)
session.add(entity)
session.commit()
TagAssociation(entity=entity, tag=tag1) # first tag
session.commit()
TagAssociation(entity=entity, tag=tag3) # tag from a different vocabulary, no problem
session.commit()
try:
TagAssociation(entity=entity, tag=tag2) # a second tag from the same vocabulary
session.commit()
assert False # ensure exception is raised
except ValueError:
# this is expected
session.rollback()
entity.tags = [TagAssociation(tag=tag2)]
session.commit() # here no exception is expected