There is no need to use a scoped_session() in a function that is just running a
single query. There’s no need to use a Session in a function that is not
running an ORM query - session.execute() and engine.execute() are equivalent.
Most of this code is unnecessary.
There is however a need to make a new Engine within each child process if you
are using multiprocessing, the code as is isn’t safe for multiple processes.
multiprocessing.Pool provides the “initializer” hook for this purpose.
A reasonable pattern for that would be:
engine = create_engine(url)
def setup_fork():
engine.dispose() # ensure all connections are gone
pool = multiprocessing.Pool(10, initializer=setup_fork)
Then as always, if the code is only opening 4 or 5 connections, that because
you’re only asking it to do 4 or 5 things at once. If you want it to use 100
connections, you need to have it doing 100 things at a time, which means your
multiprocessing.Pool would have to be of size 100 and you’d need to send at
least 100 tasks into it.
On Apr 8, 2014, at 10:21 AM, Ni Wesley <[email protected]> wrote:
> The above procedure can be performed in a shorthand way by using the
> execute() method of Engine itself:
>
> result = engine.execute("select username from users")
> for row in result:
> print "username:", row['username']
> So, seems engine.execute is shorthand of connection.execute.
>
> Here is my db apis :
> #!/usr/bin/env python
> #coding: utf-8
>
> from sqlalchemy.sql.expression import text, bindparam
> from sqlalchemy.sql import select,insert, delete, update
> from sqlalchemy.schema import Table
> from sqlalchemy.orm import sessionmaker,scoped_session
> from db import dba_logger,metadata,engine#session
> from datetime import datetime
> from exctrace import exctrace
> from sqlalchemy import and_
>
> direct_engine = True
> use_raw = False
>
> #import gevent
> #from gevent import monkey
> #monkey.patch_all()
> import multiprocessing
> from db import tables
>
> def tmp_dbwrite(tablename,**kwargs):
> """
> Used to insert exception info into database.
>
> Params:
> module : module name, indicating who raises the exception, e.g.
> android,ios,psg,adpns,db .etc
> type : exception type, 0 means service level while 1 is system level.
> message : exception description,length limit to 256 bytes
> """
> try:
> #_table=Table(tablename, metadata, autoload=True)
> _table = tables[tablename]
> i=_table.insert().values(**kwargs)
> if direct_engine:
> engine.execute(i)
> #gevent.spawn(engine.execute,i)
> #gevent.sleep(0)
> #gevent.joinall([gevent.spawn(engine.execute,i)])
> else:
> session = scoped_session(sessionmaker(bind=engine))
> session.execute(i)
> session.commit()
> session.close()
> except Exception,e:
> #dba_logger.log(40,'Exception when dbwriter:%s' % str(e))
> #dba_logger.log(20,'Exception detail:%s' % str(kwargs))
> exctrace('db','1','Error happened when writing
> db',dba_logger,'Exception when dbwriter:%s' % str(e),'Exception detail:%s' %
> str(kwargs))
> if not direct_engine:
> session.rollback()
> session.close()
>
>
>
> def tmp_dbupdate(_table,whereclause,**kwargs):
> """
> Used to insert exception info into database.
>
> Params:
> module : module name, indicating who raises the exception, e.g.
> android,ios,psg,adpns,db .etc
> type : exception type, 0 means service level while 1 is system level.
> message : exception description,length limit to 256 bytes
> """
> try:
> #_table=Table(tablename, metadata, autoload=True)
> #_table = tables[tablename]
> i=_table.update().values(**kwargs).where(whereclause)
> if direct_engine:
> engine.execute(i)
> #gevent.spawn(engine.execute,i)
> else:
> session = scoped_session(sessionmaker(bind=engine))
> session.execute(i)
> session.commit()
> session.close()
> except Exception,e:
> #dba_logger.log(40,'Exception when dbwriter:%s' % str(e))
> #dba_logger.log(20,'Exception detail:%s' % str(kwargs))
> exctrace('db','1','Error happened when updating
> db',dba_logger,'Exception when dbupdate:%s' % str(e),'Exception detail:%s' %
> str(kwargs))
> if not direct_engine:
> session.rollback()
> session.close()
>
> def dbquery(_table,whereclause):
> try:
> #_table=Table(tablename, metadata, autoload=True)
> #_table = tables[tablename]
> i=_table.select().where(whereclause)
> if direct_engine:
> res = engine.execute(i)
> return res
> else:
> session = scoped_session(sessionmaker(bind=engine))
> res = session.execute(i)
> return res
> session.close()
> except Exception,e:
> #dba_logger.log(40,'Exception when dbwriter:%s' % str(e))
> #dba_logger.log(20,'Exception detail:%s' % str(kwargs))
> exctrace('db','1','Error happened when querying
> db',dba_logger,'Exception when dbquery:%s' % str(e),'Exception detail:%s' %
> str(whereclause))
> #session.rollback()
> if not direct_engine:
> session.close()
> #res =
> session.execute(connection_writer._table.select().where(connection_writer._table.c.app_key==self.app_key).where(connection_writer._table.c.device_token==self._devicetoken))
> pool = multiprocessing.Pool(10)
> def dbwrite(tablename,**kwargs):
> pool.apply_async(tmp_dbwrite, (tablename,), kwargs)
>
> def dbupdate(tablename,whereclause,**kwargs):
> pool.apply_async(tmp_dbupdate, (tablename,whereclause), kwargs)
>
> You can see in the bottom that I use multiprocessing to do dbwrite and
> dbupdate which are called by other modules.
> Based on the above code, mostly I only see 4-5 processlist from mysql server
> side...
> That's means, I haven't reached the bottleneck of mysql server itself...
>
> 在 2014年4月8日星期二UTC+8下午9时52分58秒,Michael Bayer写道:
>
> On Apr 8, 2014, at 8:55 AM, Ni Wesley <[email protected]> wrote:
>
>> For me, I just use engine.execute most of the time, any problem with this?
>>
>> Or if I use session or connect, do I need to close the session or connection
>> everytime? otherwise, it will trigger the pool size limit error as the
>> connection is increasing, right?
>
> if you use sessions or a Connection object then yes you need to make sure
> those are closed when you’re done.
>
> The docs at http://docs.sqlalchemy.org/en/rel_0_9/core/connections.html have
> background on the difference between engine.execute and connection.execute.
>
>
>>
>>
>> 在 2014年4月8日星期二UTC+8下午8时49分16秒,Michael Bayer写道:
>> To make more connections, call connect(), and/or use more session objects.
>> Each session uses one connection.
>>
>> If you see three connections, that means your script has only worked with
>> three connections at once, such as, you opened three Session objects
>> concurrently.
>>
>> Sqlalchemy does not initiate any concurrent operations on its own.
>>
>>
>> Sent from my iPhone
>>
>> On Apr 8, 2014, at 5:03 AM, Ni Wesley <[email protected]> wrote:
>>
>>> Hi all,
>>> I have a question here.
>>>
>>> Here is my db init code snippet:
>>>
>>> engine = create_engine(db_url,
>>> pool_size=100,max_overflow=150,echo=engine_echo,pool_recycle=3600)
>>> session = scoped_session(sessionmaker(bind=engine))
>>> metadata = MetaData(bind=engine)
>>>
>>> Then, I use engine to do update/insert operations.
>>>
>>> There is no problem here, then I find during the performance test that,
>>> it's too slow to insert/update mysql server.
>>> When my script running, on mysql server, use mysql 'show processlist;'
>>> command, I see only 2 or 3 active connections kept.
>>>
>>> Then,within my script, I use multiprocessing.pool(defaults to cpu core
>>> size, which is 4 for me) to do mysql insert/update operations.
>>>
>>> Finally, I see 4 or 5 connections kept from mysql server.
>>>
>>> So, seems db operation is bottleneck because I cannot use more connections
>>> yet.
>>>
>>> How to push the conncurrent connections up? I am sure script generating db
>>> tasks are fast enough.
>>>
>>> Thanks.
>>> Wesley
>>>
>>>
>>> --
>>> You received this message because you are subscribed to the Google Groups
>>> "sqlalchemy" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an
>>> email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/sqlalchemy.
>>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "sqlalchemy" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at http://groups.google.com/group/sqlalchemy.
>> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/sqlalchemy.
> For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.