On 10/19/2016 05:49 PM, Dave Vitek wrote:
Hi all,
I'm using sqlalchemy 0.9.7 (yes, I know we need to upgrade).
I've recently started to employ psycopg2's
psycopg2.extensions.set_wait_callback function to facilitate terminating
ongoing queries in response to activity on other sockets (such as an
HTTP client disconnect or shutdown message). I essentially have this
working, but there is a wrinkle.
Specifically, when it is time to abort the ongoing query, I arrange for
the wait_callback to raise an exception. psycopg2 dutifully closes the
dbapi connection when an exception occurs, which is what I want it to
do. The problem is, sqlalchemy still thinks the connection is open at
the Connection and ConnectionFairy layers and sometimes runs into a
variety of "connection closed" errors when sqlalchemy tries to do stuff
with the dbapi connection later because it thinks it is still open.
What I want to do is call Connection.invalidate() from inside the
wait_callback, in order to invalidate the Connection, ConnectionRecord,
and ConnectionFairy. This is difficult because the only parameter to
the wait_callback function is the DBAPI connection, not the associated
high level connection object(s). I have no obvious way of getting from
the DBAPI connection to the SA Connection object(s).
well you'd need to create that backwards association. Looks like
you've tried that a bit.
I've had a few ideas on how to solve this, one of which seems to work in
my limited testing, but the documentation gives me second thoughts. I
wonder if there is a better way.
Approach 1
The first thing I tried (which did not work at all) was to use:
event.listens_for(SomeEngine, 'handle_error')
in order that I could set the is_disconnect field on the
exception_context object, thereby notifying sqlalchemy of the
disconnect. Turns out that sqlalchemy doesn't wrap calls to fetchmany
such that exceptions from fetchmany get sent to handle_error, so I
abandoned this approach. I wonder if this is indicative of a more
general issue where network errors raised by fetchmany aren't noticed?
well you're in 0.9, so the error event hooks were still coming up to
speed at that point. But also it seems like you have code that is
killing off the connection long before Connection tries to use it again
so you definitely want to set up the Connection state at that moment as
well, instead of waiting for things to further break later.
Approach 2
The next thing I tried was using:
@event.listens_for(SomeEngine, 'engine_connect')
def receive_engine_connect(conn, branch):
raw = conn.connection.connection
if not hasattr(raw, 'sa_conns'):
raw.sa_conns = weakref.WeakSet()
raw.sa_conns.add(conn)
And then when I know the dbapi connection is about to be closed for
sure, I do something like this (but more carefully):
for x in getattr(dbapi_conn, 'sa_conns', ()):
x.invalidate()
I'm surprised the psycopg2 connection is even letting you stick a data
member onto it like that.
I would link psycopg2 connection to ConnectionFairy using a global
dictionary, probably a WeakKeyDictionary because in 0.9 I don't know
that you can catch when the psycopg2 connection itself is discarded
(should be possible as of 1.1).
ConnectionFairy and ConnectionRecord then share this handy .info
dictionary that you can put whatever you want inside of it, like your
current Connection object. There should be only one Connection object
you need to care about at a time per psycopg2 connection - the
additional "Connection" objects are either "branched" from the original
one (this is passed to the engine_connect() event) or have to do with
the "threadlocal" engine that nobody uses.
The .info dictionary is also emptied out whenever the ConnectionRecord
throws out the psycopg2 connection and re-connects.
The only hook you are missing is the "revalidate" equivalent of
engine_connect, which should be added to SQLAlchemy. But that wouldn't
help you in 0.9.
Approach 3
This is similar to approach 2, except that I use the "checkout" event
instead. This allows me to invoke ConnectionFairy.invalidate() or
ConnectionRecord.invalidate(), but that seems to not be as good as
invoking Connection.invalidate(), because the Connection still doesn't
realize it is closed.
Is there something better I can do to get a mapping from DBAPI
connection to SA Connection? I am inclined to do linear search over all
Connection objects for ones using the dying dbapi connection. I assume
the engine or pool must have a collection of all Connections, and there
aren't ever more than a few in my application.
Especially for an older version like 0.9 when I've had to do more
elaborate tracking for Openstack and related projects I found myself
adding a new feature to SQLA 1.1 and then building up monkeypatches that
work for older versions, but I'd hope you don't have to do that here.
The _revalidate_connection() method is only called within the context of
executing a new statement, so I think if you combine the use of:
PoolEvents.checkout()
PoolEvents.checkin()
ConnectionEvents.engine_connect()
ConnectionEvents.before_execute()
you should be able to intercept the lifespan of every ConnectionFairy as
well as the association of every ConnectionFairy with Connection, all
outside of where a cursor has been procured. Then I'd use dictionary
of psycopg2conn -> connectionfairy, and connectionfairy.info ->
Connection, so whenever you get psycopg2 invalidation event, you'd walk
up that way. I'd also be careful of thread synchronization if this
callback occurs in some concurrent way.
- Dave
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.