Hi:

SQLA is used in my python project. And in my code, I have the green thread 
used. And the green thread was killed in some of my code. After the the 
thread was killed, and the transaction was just began. And the database 
connection was still in postgresql server side with status "idle in 
transaction". This caused my connection was exhausted in the pool.

Traceback (most recent call last): File 
"/usr/lib/python2.7/dist-packages/eventlet/hubs/hub.py", line 457, in 
fire_timers timer() File 
"/usr/lib/python2.7/dist-packages/eventlet/hubs/timer.py", line 58, in 
*call* cb(*args, kw) File 
"/usr/lib/python2.7/dist-packages/eventlet/greenthread.py", line 214, in 
main result = function(*args, kwargs) File "/tmp/test.py", line 48, in 
operate_db query = session.query(Test).all() File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2588, in 
all return list(self) File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2736, in 
iter return self._execute_and_instances(context) File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2749, in 
_execute_and_instances close_with_result=True) File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/query.py", line 2740, in 
_connection_from_session **kw) File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 905, in 
connection execution_options=execution_options) File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 910, in 
_connection_for_bind engine, execution_options) File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 334, in 
_connection_for_bind conn = bind.contextual_connect() File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2039, in 
contextual_connect self._wrap_pool_connect(self.pool.connect, None), File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 2074, in 
_wrap_pool_connect return fn() File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 376, in connect 
return _ConnectionFairy._checkout(self) File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 714, in 
_checkout fairy = _ConnectionRecord.checkout(pool) File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 480, in 
checkout rec = pool._do_get() File 
"/usr/lib/python2.7/dist-packages/sqlalchemy/pool.py", line 1054, in 
_do_get (self.size(), self.overflow(), self._timeout)) TimeoutError: 
QueuePool limit of size 2 overflow 0 reached, connection timed out, timeout 
30


Questions: 

1. When will the connection returned to the pool? 

 In my understanding, when the rollback/commit was listened, the connection 
was returned.


2. Will the connection returned to the pool if my green thread was killed? 

>From the code, we can see the connection object seems still in the pool 
with active status.

Also there is no greenexit exception here , if just kill the green thread 
safely.


3. Will the connection returned to the pool if the db transaction(id in 
transaction) was rolled back by my 
database(idle_in_transaction_session_timeout). If not, how can we handle 
the connection whose status is in "idle in transaction"?

I also kill the database transaction process in PG server, but the 
connection still not returned to the pool.


The code was updated with Monkey patching.

Thanks.


-- 
SQLAlchemy - 
The Python SQL Toolkit and Object Relational Mapper

http://www.sqlalchemy.org/

To post example code, please provide an MCVE: Minimal, Complete, and Verifiable 
Example.  See  http://stackoverflow.com/help/mcve for a full description.
--- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.
import eventlet
eventlet.monkey_patch(psycopg=True, thread=True)

import threading
import eventlet
from eventlet import greenthread
from eventlet import greenpool
import sqlalchemy
from sqlalchemy import *
from sqlalchemy.orm import sessionmaker, mapper  
import time
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.engine').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.dialects').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.orm').setLevel(logging.DEBUG)
logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.DEBUG)
import time
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,Table,MetaData,ForeignKey
from sqlalchemy import func,select
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship
from sqlalchemy.orm import backref
from sqlalchemy.pool import NullPool
from sqlalchemy.pool import QueuePool
import multiprocessing



class Test(object):
    def __init__(self, f1, f2, f3):
        self.f1 = f1
        self.f2 = f2
        self.f3 = f3 


def get_session(engine):
    Session =  sessionmaker(engine, autocommit=True)
    return Session()

def operate_db(**kwargs):
    id = kwargs["id"]
    print "in operate_db" + str(id)
    session = kwargs["session"]
    session.begin()
    query = session.query(Test).all()
    print engine.pool.status(), "before sleep dump_table1 connection object id ",engine.pool._creator
    greenthread.sleep(20)
    print engine.pool.status(), "after sleep dump_table1 connection object id ",engine.pool._creator
    session.rollback()
    print engine.pool.status(), "after commit dump_table1 connection object id ",engine.pool._creator
    print "out operate_db" + str(id)
    
def _thread_done(gt, *args, **kwargs):
    pass

engine = create_engine('postgres://xxx:x...@xxx.xxx.xxx.xxx:5433/test96',pool_size=2, echo=True,echo_pool='debug',pool_recycle=500,max_overflow=0)
engine.echo= True
metadata = MetaData()

table = Table('test', metadata,
                Column('f1', Integer, primary_key=True),
                Column('f2', Integer),
                Column('f3', Integer)
             )
metadata.create_all(engine)

mapper(Test, table)


thread_groups = greenpool.GreenPool(2)
       
    
def _thread_done(gt, *args, **kwargs):
    pass

tg1 = thread_groups.spawn(operate_db, id = 1, session = get_session(engine))
tg1.link(_thread_done,  group=thread_groups, thread=tg1)

tg2 = thread_groups.spawn(operate_db, id = 2, session = get_session(engine))
tg2.link(_thread_done,  group=thread_groups, thread=tg2)

greenthread.sleep(10)

# green thread was killed here, and the connection was in "idle in transaction"
greenthread.kill(tg1)
greenthread.kill(tg2)
print "kill success!"
greenthread.sleep(10)
# create a new transaction , which will raise  QueuePool limit error
tg3 = thread_groups.spawn(operate_db, id = 3, session = get_session(engine))
tg3.link(_thread_done,  group=thread_groups, thread=tg3)
print engine.pool.status(), "in main dump_table1 connection object id ",engine.pool._creator
greenthread.sleep(40)

Reply via email to