On 09/28/2016 04:51 PM, Viktor Roytman wrote:
I wrote a script with this sort of logic in order to insert many records
into a PostgreSQL table as they are generated.

|

|#!/usr/bin/env python3
importasyncio
fromconcurrent.futures importProcessPoolExecutoraspool
fromfunctools importpartial

importsqlalchemy assa
fromsqlalchemy.ext.declarative importdeclarative_base


metadata =sa.MetaData(schema='stackoverflow')
Base=declarative_base(metadata=metadata)


classExample(Base):
    __tablename__ ='example'
    pk =sa.Column(sa.Integer,primary_key=True)
    text =sa.Column(sa.Text)


sa.event.listen(Base.metadata,'before_create',
    sa.DDL('CREATE SCHEMA IF NOT EXISTS stackoverflow'))

engine =sa.create_engine(
    'postgresql+psycopg2://postgres:password@localhost:5432/stackoverflow'
)
Base.metadata.create_all(engine)
session =sa.orm.sessionmaker(bind=engine,autocommit=True)()


deftask(value):
    engine.dispose()
    withsession.begin():
        session.add(Example(text=value))


async definfinite_task(loop):
    spawn_task =partial(loop.run_in_executor,None,task)
    whileTrue:
        await asyncio.wait([spawn_task(value)forvalue inrange(10000)])


defmain():
    loop =asyncio.get_event_loop()
    withpool()asexecutor:
        loop.set_default_executor(executor)
        asyncio.ensure_future(infinite_task(loop))
        loop.run_forever()
        loop.close()


if__name__ =='__main__':
    main()|

|

This code works just fine, creating a pool of as many processes as I
have CPU cores, and happily chugging along forever. I wanted to see how
threads would compare to processes, but I could not get a working
example. Here are the changes I made:

|

|fromconcurrent.futures importThreadPoolExecutoraspool

session_maker =sa.orm.sessionmaker(bind=engine,autocommit=True)
Session=sa.orm.scoped_session(session_maker)


deftask(value):
    engine.dispose()
    # create new session per thread
    session =Session()
    withsession.begin():
        session.add(Example(text=value))
    # remove session once the work is done
    Session.remove()|

|

This version runs for a while before a flood of "too many clients"
exceptions:

    |sqlalchemy.exc.OperationalError:(psycopg2.OperationalError)FATAL:sorry,too
    many clients already|


What's causing the problem?

it would appear that either the ThreadExecutorPool is starting more threads than your Postgresql database has available connections, or your engine.dispose() is leaving PG connections lying open to be garbage collected. A single Engine should be all that's necessary in a single process.





--
You received this message because you are subscribed to the Google
Groups "sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to sqlalchemy+unsubscr...@googlegroups.com
<mailto:sqlalchemy+unsubscr...@googlegroups.com>.
To post to this group, send email to sqlalchemy@googlegroups.com
<mailto:sqlalchemy@googlegroups.com>.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at https://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to