I worked up a test case that simulates your usage, and checks the number of
open MSSQL connections using a call to the system stored proc "sp_who", so
it can run in a more automated fashion.

I originally got mixed results on this, it would pass about 50% of the time
and fail about 50% of the time.

So I then added some options that would force a GC collection (the idea
being to force any finalizers for the pyodbc sockets to close them), which
increased the percentage of the time the test would pass, but not eliminate
the failures.

I then added a "wait" option which simply sleeps for brief period after
closing the SA connections, and then does the connection count check. With a
1/2 second delay between the closing of the SA connection pool and the check
for "all connections closed", I get pretty reliable results for closing all
connections.

Please try the attached test on your machine and see if you get similar
results.

Rick

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To post to this group, send email to sqlalchemy@googlegroups.com
To unsubscribe from this group, send email to 
sqlalchemy+unsubscr...@googlegroups.com
For more options, visit this group at 
http://groups.google.com/group/sqlalchemy?hl=en
-~----------~----~----~----~------~----~------~--~---

# -*- encoding: utf-8

"""
   This test checks that SQLAlchemy fully closes all pooled pyodbc connection when we dispose() the engine.
   For this test to be meaningful, this test should be the only process opening and closing connections on the server.
"""

# Import pyodbc first and force off ODBC connection pooling
import pyodbc
pyodbc.pooling = False

import sqlalchemy as sa
import sqlalchemy.orm

user = ''
pwd = ''
dsn = ''
dbname = 'satest'
server = 'localhost'
wait_time = .5
force_gc = False


# establish a baseline # of connections
c_pyodbc = pyodbc.connect('DRIVER={Sql Server}; SERVER=%s; DATABASE=%s; UID=%s; PWD=%s' %(server, dbname, user, pwd))
initial_connections = c_pyodbc.execute('sp_who %s' % user).fetchall()

# open the pooled connections from SA
dburi = 'mssql://%s:%...@%s' % (user, pwd, dsn)
sa_engine = sa.create_engine(dburi, echo=False, strategy='threadlocal')
sa_Session = sa.orm.scoped_session( sa.orm.sessionmaker( bind=sa_engine ) )
metadata = sa.MetaData(sa_engine)
sa_session = sa_Session()

t = sa.Table("closetest", metadata,
             sa.Column('id', sa.INT, primary_key=True),
             sa.Column('nm', sa.VARCHAR(20))
             )
# TODO: try with autoload table

# exercise the connection
metadata.create_all()
t.insert().execute(nm='test')
tlist = t.select().execute().fetchall()
assert len(tlist) == 1
metadata.drop_all()

# close the connection
sa_session.close()
sa_Session.close_all()
sa_engine.dispose()
del sa_session
del sa_Session
del sa_engine

if wait_time:
    import time
    time.sleep(wait_time)

if force_gc:
    import gc
    gc.collect()

# ensure the number of connections has not grown
post_test_connections = c_pyodbc.execute('sp_who %s' % user).fetchall()
try:
    assert len(post_test_connections) == len(initial_connections)
    print 'Passed'
except:
    print  'Open connections!: ', len(post_test_connections), len(initial_connections)

Reply via email to