On 10/19/2016 05:49 PM, Dave Vitek wrote:
Hi all,

I'm using sqlalchemy 0.9.7 (yes, I know we need to upgrade).

I've recently started to employ psycopg2's
psycopg2.extensions.set_wait_callback function to facilitate terminating
ongoing queries in response to activity on other sockets (such as an
HTTP client disconnect or shutdown message).  I essentially have this
working, but there is a wrinkle.

Specifically, when it is time to abort the ongoing query, I arrange for
the wait_callback to raise an exception.  psycopg2 dutifully closes the
dbapi connection when an exception occurs, which is what I want it to
do.  The problem is, sqlalchemy still thinks the connection is open at
the Connection and ConnectionFairy layers and sometimes runs into a
variety of "connection closed" errors when sqlalchemy tries to do stuff
with the dbapi connection later because it thinks it is still open.

What I want to do is call Connection.invalidate() from inside the
wait_callback, in order to invalidate the Connection, ConnectionRecord,
and ConnectionFairy.  This is difficult because the only parameter to
the wait_callback function is the DBAPI connection, not the associated
high level connection object(s).  I have no obvious way of getting from
the DBAPI connection to the SA Connection object(s).

well you'd need to create that backwards association. Looks like you've tried that a bit.



I've had a few ideas on how to solve this, one of which seems to work in
my limited testing, but the documentation gives me second thoughts.  I
wonder if there is a better way.

Approach 1
The first thing I tried (which did not work at all) was to use:
event.listens_for(SomeEngine, 'handle_error')

in order that I could set the is_disconnect field on the
exception_context object, thereby notifying sqlalchemy of the
disconnect.  Turns out that sqlalchemy doesn't wrap calls to fetchmany
such that exceptions from fetchmany get sent to handle_error, so I
abandoned this approach.  I wonder if this is indicative of a more
general issue where network errors raised by fetchmany aren't noticed?

well you're in 0.9, so the error event hooks were still coming up to speed at that point. But also it seems like you have code that is killing off the connection long before Connection tries to use it again so you definitely want to set up the Connection state at that moment as well, instead of waiting for things to further break later.



Approach 2
The next thing I tried was using:
@event.listens_for(SomeEngine, 'engine_connect')
def receive_engine_connect(conn, branch):
    raw = conn.connection.connection
    if not hasattr(raw, 'sa_conns'):
        raw.sa_conns = weakref.WeakSet()
    raw.sa_conns.add(conn)

And then when I know the dbapi connection is about to be closed for
sure, I do something like this (but more carefully):
for x in getattr(dbapi_conn, 'sa_conns', ()):
    x.invalidate()

I'm surprised the psycopg2 connection is even letting you stick a data member onto it like that.

I would link psycopg2 connection to ConnectionFairy using a global dictionary, probably a WeakKeyDictionary because in 0.9 I don't know that you can catch when the psycopg2 connection itself is discarded (should be possible as of 1.1).

ConnectionFairy and ConnectionRecord then share this handy .info dictionary that you can put whatever you want inside of it, like your current Connection object. There should be only one Connection object you need to care about at a time per psycopg2 connection - the additional "Connection" objects are either "branched" from the original one (this is passed to the engine_connect() event) or have to do with the "threadlocal" engine that nobody uses.

The .info dictionary is also emptied out whenever the ConnectionRecord throws out the psycopg2 connection and re-connects.

The only hook you are missing is the "revalidate" equivalent of engine_connect, which should be added to SQLAlchemy. But that wouldn't help you in 0.9.




Approach 3
This is similar to approach 2, except that I use the "checkout" event
instead.  This allows me to invoke ConnectionFairy.invalidate() or
ConnectionRecord.invalidate(), but that seems to not be as good as
invoking Connection.invalidate(), because the Connection still doesn't
realize it is closed.





Is there something better I can do to get a mapping from DBAPI
connection to SA Connection?  I am inclined to do linear search over all
Connection objects for ones using the dying dbapi connection.  I assume
the engine or pool must have a collection of all Connections, and there
aren't ever more than a few in my application.

Especially for an older version like 0.9 when I've had to do more elaborate tracking for Openstack and related projects I found myself adding a new feature to SQLA 1.1 and then building up monkeypatches that work for older versions, but I'd hope you don't have to do that here. The _revalidate_connection() method is only called within the context of executing a new statement, so I think if you combine the use of:

PoolEvents.checkout()
PoolEvents.checkin()
ConnectionEvents.engine_connect()
ConnectionEvents.before_execute()

you should be able to intercept the lifespan of every ConnectionFairy as well as the association of every ConnectionFairy with Connection, all outside of where a cursor has been procured. Then I'd use dictionary of psycopg2conn -> connectionfairy, and connectionfairy.info -> Connection, so whenever you get psycopg2 invalidation event, you'd walk up that way. I'd also be careful of thread synchronization if this callback occurs in some concurrent way.















- Dave


--
SQLAlchemy - The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to