[sqlalchemy] Threading Queries

2014-05-08 Thread James Meneghello
A couple of questions:

I'm writing an application using concurrent.futures (by process). The 
processes themselves are fairly involved - not simple functions. I'm using 
scoped_sessions and a context manager like so:

# db.py

engine = create_engine(sqlalchemy_url)
Session = scoped_session(sessionmaker(bind=engine))

@contextmanager
def db_session():
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.remove()

Using this context manager and something like the below code:

def process():
with db_session() as db:
# the function is obviously more involved than this
u = User(name='bob')
db.add(u)

return u

def main():
with db_session() as db:
g = Group(name='peeps')
user = process()
user.group = g

# this line breaks
db.add(g)

I'm guessing this is because the call to db_session() is nested inside 
another, meaning that the thread-local session is being closed inside 
process(), and so when it gets passed back to main() the session object is 
gone. Is there a recommended way to handle this?

Along similar lines, the application (using the session/engine creation as 
above) also has to use raw_connection() at a few points to access the 
copy_expert() cursor function from psycopg2. I'm getting very strange 
errors coming out of the copy functions - I suspect due to multiple copies 
occurring at once (there's ~4 processes running at once, but rarely copying 
at the same time). The copy code looks like this:


from db import engine

conn = engine.raw_connection()
cur = conn.cursor()
cur.copy_expert(COPY parts ({}) FROM STDIN WITH CSV ESCAPE 
E''.format(', '.join(ordering)), s)
conn.commit()

Does raw_connection() still pull from a connection pool, or could two calls 
to it at once potentially destroy things?

Some of the errors are below (the data going in is clean, I've manually 
checked it).

Thanks!


---

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 940, in _execute_context
context)
  File 
/usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/default.py, line 
435, in do_execute
cursor.execute(statement, parameters)
psycopg2.DatabaseError: insufficient data in D message
lost synchronization with server: got message type 5, length 808464640

...

sqlalchemy.exc.DatabaseError: (DatabaseError) insufficient data in D 
message
lost synchronization with server: got message type 5, length 808464640

...

psycopg2.InterfaceError: connection already closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 508, in _rollback_impl
self._handle_dbapi_exception(e, None, None, None, None)
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 1108, in _handle_dbapi_exception
exc_info
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/util/compat.py, 
line 174, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=exc_value)
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/util/compat.py, 
line 167, in reraise
raise value.with_traceback(tb)
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 506, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
  File 
/usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/default.py, line 
405, in do_rollback
dbapi_connection.rollback()
sqlalchemy.exc.InterfaceError: (InterfaceError) connection already closed 
None None

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 233, in connection
return self.__connection
AttributeError: 'Connection' object has no attribute 
'_Connection__connection'

...

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 940, in _execute_context
context)
  File 
/usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/default.py, line 
435, in do_execute
cursor.execute(statement, parameters)
psycopg2.DatabaseError: lost synchronization with server: got message type 

...

Traceback (most recent call last):
  File /usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/base.py, 
line 506, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
  File 
/usr/local/lib/python3.3/dist-packages/sqlalchemy/engine/default.py, line 
405, in do_rollback
dbapi_connection.rollback()
psycopg2.InterfaceError: connection already closed

...
psycopg2.DatabaseError: error with no message from the libpq

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to 

Re: [sqlalchemy] Threading Queries

2014-05-08 Thread Michael Bayer

On May 8, 2014, at 5:06 AM, James Meneghello murod...@gmail.com wrote:

 A couple of questions:
 
 I'm writing an application using concurrent.futures (by process). The 
 processes themselves are fairly involved - not simple functions. I'm using 
 scoped_sessions and a context manager like so:
 
 # db.py
 
 engine = create_engine(sqlalchemy_url)
 Session = scoped_session(sessionmaker(bind=engine))
 
 @contextmanager
 def db_session():
 session = Session()
 try:
 yield session
 session.commit()
 except:
 session.rollback()
 raise
 finally:
 session.remove()
 
 Using this context manager and something like the below code:
 
 def process():
   with db_session() as db:
   # the function is obviously more involved than this
   u = User(name='bob')
   db.add(u)
 
   return u
 
 def main():
   with db_session() as db:
   g = Group(name='peeps')
   user = process()
   user.group = g
 
   # this line breaks
   db.add(g)
 
 I'm guessing this is because the call to db_session() is nested inside 
 another, meaning that the thread-local session is being closed inside 
 process(), and so when it gets passed back to main() the session object is 
 gone. Is there a recommended way to handle this?

if you want to have open-ended nesting of this style, you need to take out that 
session.remove step as the inner call will be blowing away your Session.   
Also, to nest Sessions like that, you usually want to have 
Session(autocommit=True), then use begin()/commit() pairs sending the 
subtransactions=True flag to Session.begin() which allows the nesting behavior 
(this is an advanced behavior so the flag is there as a check that this is 
definitely what the user is intending).

As far as recommended, I'd construct the application to not have ad-hoc 
session blocks like that, there'd be a single/few well known points where 
session scopes begin and end. As it stands, you never know where/when your 
app decides to start / end transactions.


 Along similar lines, the application (using the session/engine creation as 
 above) also has to use raw_connection() at a few points to access the 
 copy_expert() cursor function from psycopg2. I'm getting very strange errors 
 coming out of the copy functions - I suspect due to multiple copies occurring 
 at once (there's ~4 processes running at once, but rarely copying at the same 
 time). The copy code looks like this:
 
 
 from db import engine
 
 conn = engine.raw_connection()
 cur = conn.cursor()
 cur.copy_expert(COPY parts ({}) FROM STDIN WITH CSV ESCAPE 
 E''.format(', '.join(ordering)), s)
 conn.commit()
 
 Does raw_connection() still pull from a connection pool, or could two calls 
 to it at once potentially destroy things?

it uses the connection pool but that says nothing about an application that 
uses multiple processes.   Each subprocess has its own connections and its own 
pool.  If you are spawning a subprocess, you need to ensure that any DBAPI 
connection is created in that subprocess and not in the parent as they won't 
travel over subprocess boundaries.  When using an Engine the usual way to get 
this is to make sure create_engine() is called local to the subprocess, and 
that's the engine used within the process.  Or use NullPool.

-- 
You received this message because you are subscribed to the Google Groups 
sqlalchemy group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.