There is no need to use a scoped_session() in a function that is just running a 
single query.  There’s no need to use a Session in a function that is not 
running an ORM query - session.execute() and engine.execute() are equivalent.  
Most of this code is unnecessary.

There is however a need to make a new Engine within each child process if you 
are using multiprocessing, the code as is isn’t safe for multiple processes.  
multiprocessing.Pool provides the “initializer” hook for this purpose.

A reasonable pattern for that would be:

engine = create_engine(url)

def setup_fork():
    engine.dispose()  # ensure all connections are gone

pool = multiprocessing.Pool(10, initializer=setup_fork)

Then as always, if the code is only opening 4 or 5 connections, that because 
you’re only asking it to do 4 or 5 things at once.  If you want it to use 100 
connections, you need to have it doing 100 things at a time, which means your 
multiprocessing.Pool would have to be of size 100 and you’d need to send at 
least 100 tasks into it.



On Apr 8, 2014, at 10:21 AM, Ni Wesley <[email protected]> wrote:

> The above procedure can be performed in a shorthand way by using the 
> execute() method of Engine itself:
> 
> result = engine.execute("select username from users")
> for row in result:
>     print "username:", row['username']
> So, seems engine.execute is shorthand of connection.execute.
> 
> Here is my db apis :
> #!/usr/bin/env python
> #coding: utf-8
> 
> from sqlalchemy.sql.expression import text, bindparam
> from sqlalchemy.sql import select,insert, delete, update
> from sqlalchemy.schema import Table
> from sqlalchemy.orm import sessionmaker,scoped_session
> from db import dba_logger,metadata,engine#session
> from datetime import datetime
> from exctrace import exctrace
> from sqlalchemy import and_
> 
> direct_engine = True
> use_raw = False
> 
> #import gevent  
> #from gevent import monkey
> #monkey.patch_all()
> import multiprocessing
> from db import tables
>     
> def tmp_dbwrite(tablename,**kwargs):
>     """
>     Used to insert exception info into database.
>     
>     Params:
>         module : module name, indicating who raises the exception, e.g. 
> android,ios,psg,adpns,db .etc
>         type : exception type, 0 means service level while 1 is system level.
>         message : exception description,length limit to 256 bytes
>     """
>     try:
>         #_table=Table(tablename, metadata, autoload=True)
>         _table = tables[tablename]
>         i=_table.insert().values(**kwargs) 
>         if direct_engine:
>             engine.execute(i)
>             #gevent.spawn(engine.execute,i)
>             #gevent.sleep(0)
>             #gevent.joinall([gevent.spawn(engine.execute,i)])
>         else:
>             session = scoped_session(sessionmaker(bind=engine))
>             session.execute(i)
>             session.commit()
>             session.close()
>     except Exception,e:
>         #dba_logger.log(40,'Exception when dbwriter:%s' % str(e))
>         #dba_logger.log(20,'Exception detail:%s' % str(kwargs))
>         exctrace('db','1','Error happened when writing 
> db',dba_logger,'Exception when dbwriter:%s' % str(e),'Exception detail:%s' % 
> str(kwargs))
>         if not direct_engine:
>             session.rollback()
>             session.close()
> 
> 
>     
> def tmp_dbupdate(_table,whereclause,**kwargs):
>     """
>     Used to insert exception info into database.
>     
>     Params:
>         module : module name, indicating who raises the exception, e.g. 
> android,ios,psg,adpns,db .etc
>         type : exception type, 0 means service level while 1 is system level.
>         message : exception description,length limit to 256 bytes
>     """
>     try:
>         #_table=Table(tablename, metadata, autoload=True)
>         #_table = tables[tablename]
>         i=_table.update().values(**kwargs).where(whereclause) 
>         if direct_engine:
>             engine.execute(i)
>             #gevent.spawn(engine.execute,i)
>         else:
>             session = scoped_session(sessionmaker(bind=engine))
>             session.execute(i)
>             session.commit()
>             session.close()
>     except Exception,e:
>         #dba_logger.log(40,'Exception when dbwriter:%s' % str(e))
>         #dba_logger.log(20,'Exception detail:%s' % str(kwargs))
>         exctrace('db','1','Error happened when updating 
> db',dba_logger,'Exception when dbupdate:%s' % str(e),'Exception detail:%s' % 
> str(kwargs))
>         if not direct_engine:
>             session.rollback()
>             session.close()
>         
> def dbquery(_table,whereclause):
>     try:
>         #_table=Table(tablename, metadata, autoload=True)
>         #_table = tables[tablename]
>         i=_table.select().where(whereclause) 
>         if direct_engine:
>             res = engine.execute(i)
>             return res
>         else:
>             session = scoped_session(sessionmaker(bind=engine))
>             res = session.execute(i)
>             return res
>             session.close()
>     except Exception,e:
>         #dba_logger.log(40,'Exception when dbwriter:%s' % str(e))
>         #dba_logger.log(20,'Exception detail:%s' % str(kwargs))
>         exctrace('db','1','Error happened when querying 
> db',dba_logger,'Exception when dbquery:%s' % str(e),'Exception detail:%s' % 
> str(whereclause))
>         #session.rollback()
>         if not direct_engine:
>             session.close()
>     #res = 
> session.execute(connection_writer._table.select().where(connection_writer._table.c.app_key==self.app_key).where(connection_writer._table.c.device_token==self._devicetoken))
> pool = multiprocessing.Pool(10)
> def dbwrite(tablename,**kwargs):
>     pool.apply_async(tmp_dbwrite, (tablename,), kwargs)
>     
> def dbupdate(tablename,whereclause,**kwargs):
>     pool.apply_async(tmp_dbupdate, (tablename,whereclause), kwargs)
> 
> You can see in the bottom that I use multiprocessing to do dbwrite and 
> dbupdate which are called by other modules.
> Based on the above code, mostly I only see 4-5 processlist from mysql server 
> side...
> That's means, I haven't reached the bottleneck of mysql server itself...
> 
> 在 2014年4月8日星期二UTC+8下午9时52分58秒,Michael Bayer写道:
> 
> On Apr 8, 2014, at 8:55 AM, Ni Wesley <[email protected]> wrote:
> 
>> For me, I just use engine.execute most of the time, any problem with this?
>> 
>> Or if I use session or connect, do I need to close the session or connection 
>> everytime? otherwise, it will trigger the pool size limit error as the 
>> connection is increasing, right?
> 
> if you use sessions or a Connection object then yes you need to make sure 
> those are closed when you’re done.    
> 
> The docs at http://docs.sqlalchemy.org/en/rel_0_9/core/connections.html have 
> background on the difference between engine.execute and connection.execute.
> 
> 
>> 
>> 
>> 在 2014年4月8日星期二UTC+8下午8时49分16秒,Michael Bayer写道:
>> To make more connections, call connect(), and/or use more session objects.  
>> Each session uses one connection.
>> 
>> If you see three connections, that means your script has only worked with 
>> three connections at once, such as, you opened three Session objects 
>> concurrently.
>> 
>> Sqlalchemy does not initiate any concurrent operations on its own.
>> 
>> 
>> Sent from my iPhone
>> 
>> On Apr 8, 2014, at 5:03 AM, Ni Wesley <[email protected]> wrote:
>> 
>>> Hi all,
>>>    I have a question here.
>>> 
>>> Here is my db init code snippet:
>>> 
>>> engine = create_engine(db_url, 
>>> pool_size=100,max_overflow=150,echo=engine_echo,pool_recycle=3600)
>>> session = scoped_session(sessionmaker(bind=engine))
>>> metadata = MetaData(bind=engine)
>>> 
>>> Then, I use engine to do update/insert operations.
>>> 
>>> There is no problem here, then I find during the performance test that, 
>>> it's too slow to insert/update mysql server.
>>> When my script running, on mysql server, use mysql 'show processlist;' 
>>> command, I see only 2 or 3 active connections kept.
>>> 
>>> Then,within my script, I use multiprocessing.pool(defaults to cpu core 
>>> size, which is 4 for me) to do mysql insert/update operations.
>>> 
>>> Finally, I see 4 or 5 connections kept from mysql server.
>>> 
>>> So, seems db operation is bottleneck because I cannot use more connections 
>>> yet.
>>> 
>>> How to push the conncurrent connections up? I am sure script generating db 
>>> tasks are fast enough.
>>> 
>>> Thanks.
>>> Wesley
>>> 
>>> 
>>> -- 
>>> You received this message because you are subscribed to the Google Groups 
>>> "sqlalchemy" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an 
>>> email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/sqlalchemy.
>>> For more options, visit https://groups.google.com/d/optout.
>> 
>> 
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "sqlalchemy" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at http://groups.google.com/group/sqlalchemy.
>> For more options, visit https://groups.google.com/d/optout.
> 
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.

-- 
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to