On 09/28/2016 04:51 PM, Viktor Roytman wrote:
I wrote a script with this sort of logic in order to insert many records into a PostgreSQL table as they are generated. | |#!/usr/bin/env python3 importasyncio fromconcurrent.futures importProcessPoolExecutoraspool fromfunctools importpartial importsqlalchemy assa fromsqlalchemy.ext.declarative importdeclarative_base metadata =sa.MetaData(schema='stackoverflow') Base=declarative_base(metadata=metadata) classExample(Base): __tablename__ ='example' pk =sa.Column(sa.Integer,primary_key=True) text =sa.Column(sa.Text) sa.event.listen(Base.metadata,'before_create', sa.DDL('CREATE SCHEMA IF NOT EXISTS stackoverflow')) engine =sa.create_engine( 'postgresql+psycopg2://postgres:password@localhost:5432/stackoverflow' ) Base.metadata.create_all(engine) session =sa.orm.sessionmaker(bind=engine,autocommit=True)() deftask(value): engine.dispose() withsession.begin(): session.add(Example(text=value)) async definfinite_task(loop): spawn_task =partial(loop.run_in_executor,None,task) whileTrue: await asyncio.wait([spawn_task(value)forvalue inrange(10000)]) defmain(): loop =asyncio.get_event_loop() withpool()asexecutor: loop.set_default_executor(executor) asyncio.ensure_future(infinite_task(loop)) loop.run_forever() loop.close() if__name__ =='__main__': main()| | This code works just fine, creating a pool of as many processes as I have CPU cores, and happily chugging along forever. I wanted to see how threads would compare to processes, but I could not get a working example. Here are the changes I made: | |fromconcurrent.futures importThreadPoolExecutoraspool session_maker =sa.orm.sessionmaker(bind=engine,autocommit=True) Session=sa.orm.scoped_session(session_maker) deftask(value): engine.dispose() # create new session per thread session =Session() withsession.begin(): session.add(Example(text=value)) # remove session once the work is done Session.remove()| | This version runs for a while before a flood of "too many clients" exceptions: |sqlalchemy.exc.OperationalError:(psycopg2.OperationalError)FATAL:sorry,too many clients already| What's causing the problem?
it would appear that either the ThreadExecutorPool is starting more threads than your Postgresql database has available connections, or your engine.dispose() is leaving PG connections lying open to be garbage collected. A single Engine should be all that's necessary in a single process.
-- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com <mailto:sqlalchemy+unsubscr...@googlegroups.com>. To post to this group, send email to sqlalchemy@googlegroups.com <mailto:sqlalchemy@googlegroups.com>. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.
-- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To post to this group, send email to sqlalchemy@googlegroups.com. Visit this group at https://groups.google.com/group/sqlalchemy. For more options, visit https://groups.google.com/d/optout.