Do you know what transaction isolation level you are running at? The
default apparently is "REPEATABLE READ":

  
http://dev.mysql.com/doc/refman/5.6/en/set-transaction.html#isolevel_repeatable-read

The important sentence in that link is:

  All consistent reads within the same transaction read the snapshot
established by the first read

When you query the database for the first time, to see if the entity
already exists, you are setting that initial snapshot. If you run the
same query again (such as in your exception handler), you will get the
same results, whether or not another connection has inserted a
matching row in the meantime.

Simon

On Mon, Sep 2, 2013 at 12:54 PM, herzaso <herz...@gmail.com> wrote:
> I'm not sure what to make of the results:
> On the first connection, I ran BEGIN and INSERT and both were successful,
> but when I tried the INSERT statement on the second connection, I got "ERROR
> 1205 (HY000): Lock wait timeout exceeded; try restarting transaction".
> Running the same query on the first connection produced the required result
> which is "ERROR 1062 (23000): Duplicate entry"
> After the ROLLBACK on the first connection, the INSERT statement worked well
> on the second connection
>
> Regarding your second remark, the answer is yes, the error was due to the
> unique constraint on those columns
>
> BTW: I'm working on MySQL
>
> On Monday, September 2, 2013 1:31:12 PM UTC+3, Simon King wrote:
>>
>> I don't really know the answer, but I'd be interested in the results
>> of this experiment:
>>
>> Forget about SQLAlchemy for the moment, and start 2 plain SQL
>> connections to your database. In the first, type something like the
>> following:
>>
>> BEGIN;
>> INSERT foo(bar, baz, qux) VALUES(1, 1, 1);
>>
>> Now in the second connection do the same. I assume it'll fail because
>> of the duplicate values.
>>
>> Now in the first connection issue a "ROLLBACK". You should now be in a
>> state where no matching row exists in the database, even though you
>> received an error about constraint violations.
>>
>> The results you see may be different, depending on your transaction
>> isolation level. (It may be that you don't get the constraint
>> violation at all until you try to commit the second connection).
>>
>> Another thing you could look at: are you sure that the error you are
>> getting is due to the unique constraint on bar/baz/qux, and not some
>> other constraint in the database?
>>
>> Simon
>>
>> On Mon, Sep 2, 2013 at 8:45 AM, herzaso <her...@gmail.com> wrote:
>> > I'm afraid it didn't solve my problem.
>> >
>> > Here is my updated method:
>> >     @classmethod
>> >     def get(cls, bar=None, baz=None, qux=None, **kwargs):
>> >         query = session.query(cls).\
>> >             filter(cls.bar == bar).\
>> >             filter(cls.baz == baz).\
>> >             filter(cls.qux == qux)
>> >
>> >         item = query.first()
>> >         updated = False
>> >
>> >         if not item:
>> >             try:
>> >                 with session.begin_nested():   # run inside a SAVEPOINT
>> >                     updated = True
>> >                     item = cls(bar=bar, baz=baz, qux=qux, **kwargs)
>> >                     session.add(item)
>> >                     session.flush()
>> >             except sa.exc.IntegrityError:
>> >                 item = query.first()
>> >                 if not item:
>> >                     raise Exception("invalidIntegrityError")
>> >             except:
>> >                 raise
>> >
>> >         if not updated:
>> >             for k, v in kwargs.iteritems():
>> >                 if getattr(item, k) != v:
>> >                     setattr(item, k, v)
>> >
>> >         return item
>> >
>> > With this code, i'm getting invalidIntegrityError. How is it possible?
>> > (it's also worth pointing out that this solution requires SA 0.8.2
>> > (otherwise, there is a problem with session.begin_nested)
>> >
>> >
>> > On Tuesday, August 27, 2013 6:40:03 PM UTC+3, Michael Bayer wrote:
>> >>
>> >> I'm not a fan of catching integrity errors, i prefer to try to make
>> >> sure
>> >> they aren't going to happen, or if they are, they aren't a normal
>> >> occurrence
>> >> and the system is such that the particular operation can just fail (of
>> >> course it depends on what it is).     A problem with catching the
>> >> integrity
>> >> error due to concurrent, conflicting operations is that depending on
>> >> backend
>> >> and isolation level, you can't be totally sure when the error is going
>> >> to
>> >> get raised (e.g. serializable isolation vs. non).  Also on a backend
>> >> like
>> >> Postgresql, the database can't recover the transaction after an
>> >> integrity
>> >> error unless you used a savepoint.
>> >>
>> >> But here you're doing the "concurrent transactions need row identity
>> >> X",
>> >> so maybe it is appropriate here.  Here is a rough idea of a
>> >> transactional
>> >> pattern for that, noting this isn't tested:
>> >>
>> >> try:
>> >>     my_object = Session.query(MyClass).filter(....).one()
>> >> except NoResultFound:
>> >>     try:
>> >>         with Session.begin_nested():   # run inside a SAVEPOINT
>> >>             my_object = MyClass(...)
>> >>             Session.add(my_object)
>> >>             Session.flush()
>> >>     except IntegrityError:
>> >>         my_object = Session.query(MyClass).filter(....).one()
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Aug 27, 2013, at 11:13 AM, herzaso <her...@gmail.com> wrote:
>> >>
>> >> Suppose we are looking at a race condition, do you also think this
>> >> should
>> >> be handled by catching the IntegrityError?
>> >> If so, what should I do? only flush and do the operation again?
>> >>
>> >> On Tuesday, August 27, 2013 5:42:23 PM UTC+3, Michael Bayer wrote:
>> >>>
>> >>> the word "occasional" is very meaningful.  It usually suggests race
>> >>> conditions.    Then with the word "tornado", the baysean filters are
>> >>> strongly leaning towards "race condition" at that point :).
>> >>>
>> >>> if an error is occurring only under volume then you have to revisit
>> >>> where
>> >>> race conditions can occur.
>> >>>
>> >>> On Aug 27, 2013, at 10:32 AM, herzaso <her...@gmail.com> wrote:
>> >>>
>> >>> I'm running a Tornado server without redundancy (only one process,
>> >>> requests can arrive at the same time but will be handled one at a
>> >>> time)
>> >>> I do agree that for large volumes, catching the IntegrityError would
>> >>> be
>> >>> better, but currently I am handling a single request at a time and I
>> >>> want to
>> >>> fix this problem before I move on ...
>> >>>
>> >>>
>> >>> On Tuesday, August 27, 2013 5:24:07 PM UTC+3, Simon King wrote:
>> >>>>
>> >>>> On Tue, Aug 27, 2013 at 2:31 PM, herzaso <her...@gmail.com> wrote:
>> >>>> > On Tuesday, August 27, 2013 3:55:50 PM UTC+3, Simon King wrote:
>> >>>> >>
>> >>>> >> On Tue, Aug 27, 2013 at 1:40 PM, herzaso <her...@gmail.com> wrote:
>> >>>> >> > I have a model with an ID column set as the primary key, though
>> >>>> >> > i'd
>> >>>> >> > like
>> >>>> >> > to
>> >>>> >> > be able to identify records by 3 other columns.
>> >>>> >> > For this situation, I've added a classmethod that will fetch the
>> >>>> >> > record
>> >>>> >> > if
>> >>>> >> > found or a new record if not.
>> >>>> >> > The problem i'm having is that every once in a while, I get
>> >>>> >> > IntegrityError
>> >>>> >> > trying to flush a change
>> >>>> >> >
>> >>>> >> > class Foo(Base):
>> >>>> >> >     __table_args__ = (sa.UniqueConstraint('bar', 'baz', 'qux'),)
>> >>>> >> >
>> >>>> >> >     id = sa.Column(Identifier, sa.Sequence('%s_id_seq' %
>> >>>> >> > __tablename__),
>> >>>> >> > nullable=False, primary_key=True)
>> >>>> >> >     bar = sa.Column(sa.BigInteger)
>> >>>> >> >     baz = sa.Column(sa.BigInteger)
>> >>>> >> >     qux = sa.Column(sa.BigInteger)
>> >>>> >> >     a1 = sa.Column(sa.BigInteger)
>> >>>> >> >     a2 = sa.Column(sa.BigInteger)
>> >>>> >> >
>> >>>> >> >     @classmethod
>> >>>> >> >     def get(cls, bar=None, baz=None, qux=None, **kwargs):
>> >>>> >> >         item = session.query(cls).\
>> >>>> >> >             filter(cls.bar== bar).\
>> >>>> >> >             filter(cls.baz == baz).\
>> >>>> >> >             filter(cls.qux == qux).\
>> >>>> >> >             first()
>> >>>> >> >
>> >>>> >> >         if item:
>> >>>> >> >             for k, v in kwargs.iteritems():
>> >>>> >> >                 if getattr(item, k) != v:
>> >>>> >> >                     setattr(item, k, v)
>> >>>> >> >         else:
>> >>>> >> >             item = cls(bar=bar, baz=baz, qux=qux, **kwargs)
>> >>>> >> >
>> >>>> >> >         return item
>> >>>> >> >
>> >>>> >> > This is the code I use to add/update records:
>> >>>> >> >
>> >>>> >> > foo = Foo.get(**item)
>> >>>> >> > session.merge(foo)
>> >>>> >> >
>> >>>> >> > I'm struggling with this problem for some time now, and would
>> >>>> >> > appreciate
>> >>>> >> > any
>> >>>> >> > help ...
>> >>>> >> >
>> >>>> >>
>> >>>> >> I'm not sure of the exact problem, but there are a couple of
>> >>>> >> things
>> >>>> >> that you could investigate.
>> >>>> >>
>> >>>> >> Firstly, session.merge returns a copy of the object, rather than
>> >>>> >> adding the object that you supplied into the session. See
>> >>>> >> http://docs.sqlalchemy.org/en/rel_0_8/orm/session.html#merging for
>> >>>> >> details.
>> >>>> >>
>> >>>> >> Secondly, your "get" method sometimes returns objects that are
>> >>>> >> already
>> >>>> >> part of the session (if they were in the database), and sometimes
>> >>>> >> objects that are not in the session. It would probably be more
>> >>>> >> consistent to always return objects that are part of the session,
>> >>>> >> by
>> >>>> >> putting "session.add(item)" in your "else" clause. This would get
>> >>>> >> rid
>> >>>> >> of the need for session.merge(). (If you want to be able to use
>> >>>> >> the
>> >>>> >> "get" with non-global sessions, pass the session as a parameter.)
>> >>>> >>
>> >>>> >> Finally, if your session isn't auto-flushing, it would be possible
>> >>>> >> for
>> >>>> >> you to call "get" twice with the same parameters and get 2
>> >>>> >> different
>> >>>> >> objects back.
>> >>>> >>
>> >>>> >> You may want to look at the UniqueObject recipe in the wiki:
>> >>>> >> http://www.sqlalchemy.org/trac/wiki/UsageRecipes/UniqueObject
>> >>>> >>
>> >>>> > Hi Simon,
>> >>>> > Thanks for the fast reply.
>> >>>> >
>> >>>> > I tried adding session.add(item) and session.flush() in the else
>> >>>> > clause in
>> >>>> > the past but that didn't solve my problem.
>> >>>> > I didn't however remove the merge, do you think that might be the
>> >>>> > problem?
>> >>>> >
>> >>>> > Regarding the flush, this code is part of an API server where a
>> >>>> > scoped_session is committed after each change. I haven't changed
>> >>>> > the
>> >>>> > autoflush parameter, and as I understand the default value is True
>> >>>> > making a
>> >>>> > flush before each commit or query.
>> >>>> >
>> >>>> > As for the UniqueObject recipe, thanks! Amazing that I never found
>> >>>> > it
>> >>>> > searching for a cure. As I see it basically does the same ...
>> >>>> >
>> >>>> > I never managed to reproduce this bug on my development
>> >>>> > environment.
>> >>>> > It only
>> >>>> > happens in my production environment.
>> >>>> > Do you suppose adding a session.add and removing the merge will
>> >>>> > solve
>> >>>> > this
>> >>>> > issue?
>> >>>> >
>> >>>> > Thanks,
>> >>>> > Ofir
>> >>>>
>> >>>> It's difficult to say without knowing more about your system. For
>> >>>> example, does your production system get multiple concurrent API
>> >>>> requests, or are they serialised? If 2 requests can come in at
>> >>>> approximately the same time and are handled by 2 different threads
>> >>>> (or
>> >>>> processes), then it is easy to imagine that the first handler will
>> >>>> check the database, find that an entry doesn't exist, and create it.
>> >>>> But before it flushes the change to the database (or even after it
>> >>>> flushes, but before it commits, depending on your transaction
>> >>>> isolation), the second handler will check for the same object, find
>> >>>> it
>> >>>> missing, and so create it.
>> >>>>
>> >>>> To track down problems like this, you could ensure that your
>> >>>> development environment has the same thread/process behaviour as the
>> >>>> production environment, then try submitting multiple concurrent
>> >>>> requests to it. If you add "time.sleep" statements somewhere between
>> >>>> the creation of the object and the commit of the transaction you will
>> >>>> probably find it easier to trigger.
>> >>>>
>> >>>> To actually fix the problem, you could choose to only handle a single
>> >>>> request at a time (fine if you don't expect a high volume of
>> >>>> requests). If that's not acceptable, you could catch the
>> >>>> IntegrityError and then re-process the request.
>> >>>>
>> >>>> Hope that helps,
>> >>>>
>> >>>> Simon
>> >>>

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to