I did a bit more digging.  It looks like utils.counter() is used in two 
places that are causing me hangs in multiprocess subprocesses. In 
instances() in order to assign a unique runid to the QueryContext, and in 
session instantiation in order to assign a unique hash_key to the session.

In both places the "counter" seems to simply be supplying a unique 
identifier.  So here is my monkey-patch that seems to be working:

import uuid
from sqlalchemy.orm import loading, session
loading._new_runid = uuid.uuid4
session._new_sessionid = uuid.uuid4

as far as I can see so far uuid appears to be both thread (as of 2.6.1 
http://legacy.python.org/download/releases/2.6.1/NEWS.txt) and multiprocess 
safe (purely anecdotal based on my experience of the last half hour).  I'll 
leave my test running for a few hours, but using the counter() method to 
generate unique IDs I was reliably able to get a hung process in under ten 
minutes, so it's looking good so far.

I'm not sure what the broader implications of such a change are 
(safety/performance/etc), but I'm wondering if you would consider merging 
something like this into sqlalchemy.

On Thursday, February 18, 2016 at 6:24:40 PM UTC-8, Mike Bayer wrote:
>
>
>
> On 02/18/2016 01:39 PM, Uri Okrent wrote: 
> > Looks like forking from a thread causes other issues.  I think I've 
> > resolved the hang in psycopg2 by creating a brand new engine in the 
> > forked subprocess, but now I'm getting occasional hangs here: 
> > 
> > #1 Waiting for a lock (e.g. GIL) 
> > #2 <built-in method acquire of thread.lock object at remote 
> 0x7fc061c17b58> 
> > #4 file 
> > '/usr/lib64/python2.6/site-packages/sqlalchemy/util/langhelpers.py', in 
> > '_next' 
> > #8 file '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/loading.py', 
> > in 'instances' 
> > #16 file '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/query.py', 
> > in '__getitem__' 
> > #24 file '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/query.py', 
> > in 'first' 
> > 
> > At this point: 
> > | 
> > def counter(): 
> >      """Return a threadsafe counter function.""" 
> > 
> >      lock = compat.threading.Lock() 
> >      counter = itertools.count(1) 
> > 
> >      # avoid the 2to3 "next" transformation... 
> >      def _next(): 
> >          lock.acquire() 
> >          try: 
> >              return next(counter) 
> >          finally: 
> >              lock.release() 
> > 
> >      return _next 
> > | 
> > 
> > 
> > Not sure a process pool will solve the issue since I can't necessarily 
> > tell the volume of concurrent queries beforehand.  I'm wondering if 
> > there's a way to clear the lock manually in the forked process, or 
> > somehow protect this section when forking.  Either one is probably a 
> > monkey patch... 
>
> if you're using python multiprocessing, that thing has a lot of 
> deadlock-ish things in it especially in an older Python like 2.6 - it 
> uses threads in various ways to communicate with process pools and such. 
>    Can't really say why your counter is locking, would have to at least 
> see how you're using it and also I don't quite get how this counter 
> function is interacting with query.first().   But locks are not 
> "interruptable" unless you kill the whole thread in which it runs, so 
> you'd need to put a timeout on it. 
>
>
>
>
>
> > 
> > On Wednesday, February 17, 2016 at 10:41:00 AM UTC-8, Mike Bayer wrote: 
> > 
> > 
> > 
> >     On 02/17/2016 11:33 AM, Uri Okrent wrote: 
> >      > Maybe this is a psycopg question and if so please say so. 
> >      > 
> >      > I have a multi-threaded server which maintains a thread-pool (and 
> a 
> >      > corresponding connection pool) for servicing requests.  In order 
> to 
> >      > mitigate python's high-water-mark memory usage behavior for large 
> >      > queries, I'm attempting to handle queries in particular using a 
> >     forked 
> >      > subprocess from the request thread. 
> >      > 
> >      > I'm using the connection invalidation recipe described here (the 
> >     second 
> >      > one that adds listeners to the Pool): 
> >      > 
> >     
> http://docs.sqlalchemy.org/en/latest/core/pooling.html#using-connection-pools-with-multiprocessing
>  
> >     <
> http://docs.sqlalchemy.org/en/latest/core/pooling.html#using-connection-pools-with-multiprocessing>
>  
>
> > 
> >      > 
> >      > It seems to be working correctly -- that is, I can see that the 
> >     child 
> >      > process is indeed creating a new connection.  However, I'm still 
> >      > experiencing intermittent hangs in the child process during 
> >     connection 
> >      > creation.  I've gotten a stack trace using gdb, and I think I 
> >     understand 
> >      > what is going on but I'm not sure how to protect the critical 
> >     section. 
> >      > 
> >      > It looks like threads creating connections in the parent process 
> >     acquire 
> >      > some threading synchronization primitive inside psycopg's 
> _connect 
> >      > function (that's in c so I didn't see the actual source).  This 
> >      > apparently occurs occasionally at the same time as the fork, so 
> >     that the 
> >      > child process never sees the primitive release in the parent 
> >     process and 
> >      > hangs forever.  Interestingly, hangs stop after the server has 
> been 
> >      > running for a while, presumably because the parent process is 
> >     warmed up 
> >      > and has a full connection pool, and is no longer creating 
> >     connections. 
> >      > 
> >      > Here is my stack on a hung process: 
> >      > #17 <built-in function _connect> 
> >      > #19 file 
> >     '/usr/lib64/python2.6/site-packages/psycopg2/__init__.py', in 
> >      > 'connect' 
> >      > #24 file 
> >      > 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/engine/default.py', 
> in 
> >      > 'connect' 
> >      > #29 file 
> >      > 
> >     
> '/usr/lib64/python2.6/site-packages/sqlalchemy/engine/strategies.py', in 
> > 
> >      > 'connect' 
> >      > #33 file '/usr/lib64/python2.6/site-packages/sqlalchemy/pool.py', 
> in 
> >      > '__connect' 
> >      > #36 file '/usr/lib64/python2.6/site-packages/sqlalchemy/pool.py', 
> in 
> >      > 'get_connection' 
> >      > #39 file '/usr/lib64/python2.6/site-packages/sqlalchemy/pool.py', 
> in 
> >      > 'checkout' 
> >      > #43 file '/usr/lib64/python2.6/site-packages/sqlalchemy/pool.py', 
> in 
> >      > '_checkout' 
> >      > #47 file '/usr/lib64/python2.6/site-packages/sqlalchemy/pool.py', 
> in 
> >      > 'connect' 
> >      > #50 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/engine/base.py', 
> >      > in '_wrap_pool_connect' 
> >      > #54 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/engine/base.py', 
> >      > in 'contextual_connect' 
> >      > #58 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/session.py', 
> >      > in '_connection_for_bind' 
> >      > #61 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/session.py', 
> >      > in '_connection_for_bind' 
> >      > #65 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/session.py', 
> >      > in 'connection' 
> >      > #70 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/query.py', 
> >      > in '_connection_from_session' 
> >      > #74 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/query.py', 
> >      > in '_execute_and_instances' 
> >      > #77 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/query.py', 
> >      > in '__iter__' 
> >      > #91 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/query.py', 
> >      > in '__getitem__' 
> >      > #99 file 
> >     '/usr/lib64/python2.6/site-packages/sqlalchemy/orm/query.py', 
> >      > in 'first' 
> >      > 
> >      > I'm using sqlalchemy 1.0.12 and psycopg 2.5.3 
> >      > 
> >      > My quick and dirty fix would be to fill the connection pool in 
> the 
> >      > parent process by force before servicing requests, 
> > 
> >     well I'd not want to transfer a psycopg2 connection from a parent to 
> a 
> >     child fork, because now that same filehandle is in both processes 
> and 
> >     you'll get unsafe concurrent access on it. 
> > 
> >     I've used multiprocessing with psycopg2 for years in a wide variety 
> of 
> >     scenarios and I've never seen it hanging on the actual 
> psycopg2.connect 
> >     call.   But perhaps that's because I've never called fork() from 
> inside 
> >     a thread that is not the main thread - if that is what's triggering 
> it 
> >     here, I'd use a pattern such as a process pool or similar where the 
> >     forking is done from the main thread ahead of time. 
> > 
> > 
> > 
> > 
> >     but that is a hack, 
> >      > and in case of an invalidated connection the server would be 
> >     susceptible 
> >      > to the issue again while recreating the invalid connection in the 
> >     parent 
> >      > process. 
> >      > I apparently need to synchronize my fork in one thread with 
> >     connections 
> >      > being created in others, but I'm not sure how to do that.  Any 
> >     pointers 
> >      > would be great. 
> >      > 
> >      > TIA, 
> >      > Uri 
> >      > 
> >      > -- 
> >      > You received this message because you are subscribed to the 
> Google 
> >      > Groups "sqlalchemy" group. 
> >      > To unsubscribe from this group and stop receiving emails from it, 
> >     send 
> >      > an email to sqlalchemy+...@googlegroups.com <javascript:> 
> >      > <mailto:sqlalchemy+unsubscr...@googlegroups.com <javascript:> 
> <javascript:>>. 
> >      > To post to this group, send email to sqlal...@googlegroups.com 
> >     <javascript:> 
> >      > <mailto:sqlal...@googlegroups.com <javascript:>>. 
> >      > Visit this group at https://groups.google.com/group/sqlalchemy 
> >     <https://groups.google.com/group/sqlalchemy>. 
> >      > For more options, visit https://groups.google.com/d/optout 
> >     <https://groups.google.com/d/optout>. 
> > 
> > -- 
> > You received this message because you are subscribed to the Google 
> > Groups "sqlalchemy" group. 
> > To unsubscribe from this group and stop receiving emails from it, send 
> > an email to sqlalchemy+...@googlegroups.com <javascript:> 
> > <mailto:sqlalchemy+unsubscr...@googlegroups.com <javascript:>>. 
> > To post to this group, send email to sqlal...@googlegroups.com 
> <javascript:> 
> > <mailto:sqlal...@googlegroups.com <javascript:>>. 
> > Visit this group at https://groups.google.com/group/sqlalchemy. 
> > For more options, visit https://groups.google.com/d/optout. 
>

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to