Works! Thank you! четверг, 31 марта 2022 г. в 15:42:25 UTC+3, Mike Bayer:
> when using multiprocessing, the connection pool in the new process must be > replaced with a new one. This is usually accomplished by calling > engine.dispose(). However, to maintain the pool in the parent process as > well, replace the connection pool alone without disposing the old one: > > engine = create_engine(...) > > Session = sessionmaker(bind=engine) > > def job_update_rd(data_list): > updated = [] > with Session() as session: > for t, ts in data_list: > rd = session.query(RawDataTable).filter(and_( > RawDataTable.timestamp == t, > RawDataTable.ts == ts)).one() > > rd.ts = updated_ts[ts] > session.commit() > > updated.append(rd) > > return updated > > > def initializer(): > engine.pool = engine.pool.recreate() > > with Pool(10, initializer=initializer) as p: > upd_list = p.map(job_update_rd, chunks) > > > For many years we've advised calling engine.dispose() here as documented > at > https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork > > . It was recently pointed out that this closes out the parent process' > connections, so in SQLAlchemy 1.4.33 there will be a parameter so you can > change the above code to engine.dispose(close=False). > > > > On Thu, Mar 31, 2022, at 7:06 AM, Evgenii wrote: > > > Hello! > From time to time, I need to update data in tables and multiprocessing can > speed up this process. Last example: I’m trying to update data 7M rows in > table > > > SQLAlchemy 1.4.31, psycopg2 2.8.6, PostgreSQL > > *def* *job_update_rd*(data_list): > updated = [] > *with* Session() *as* session: > *for* t, ts *in* data_list: > rd = session.query(RawDataTable).filter(and_( > RawDataTable.timestamp == t, > RawDataTable.ts == ts)).one() > > rd.ts = updated_ts[ts] > session.commit() > > updated.append(rd) > > *return* updated > *with* Pool(10) *as* p: > upd_list = p.map(job_update_rd, chunks) > > Code is very simple, but it does not work. I get these errors randomly: > > - psycopg2.OperationalError: SSL error: sslv3 alert bad record mac > - sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL > SYSCALL error: EOF detected > > But this example works fine: > > *def* *other_job*(data_list): > *with* Session() *as* s: > *return* [s.query(RawDataTable).filter(and_( > RawDataTable.timestamp == t, > RawDataTable.ts == ts)).all() *for* t, ts *in* data_list] > *with* Pool(10) *as* p: > res = p.map(other_job, chunks) > > > Please, help to solve this problem. > Some people is our team also uses multiprocessing, and 1 time a week get > these errors. > > > > > -- > SQLAlchemy - > The Python SQL Toolkit and Object Relational Mapper > > http://www.sqlalchemy.org/ > > To post example code, please provide an MCVE: Minimal, Complete, and > Verifiable Example. See http://stackoverflow.com/help/mcve for a full > description. > --- > You received this message because you are subscribed to the Google Groups > "sqlalchemy" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to sqlalchemy+...@googlegroups.com. > To view this discussion on the web visit > https://groups.google.com/d/msgid/sqlalchemy/4d61a93d-f83f-455c-ab2a-1cb28e154af8n%40googlegroups.com > > <https://groups.google.com/d/msgid/sqlalchemy/4d61a93d-f83f-455c-ab2a-1cb28e154af8n%40googlegroups.com?utm_medium=email&utm_source=footer> > . > > > -- SQLAlchemy - The Python SQL Toolkit and Object Relational Mapper http://www.sqlalchemy.org/ To post example code, please provide an MCVE: Minimal, Complete, and Verifiable Example. See http://stackoverflow.com/help/mcve for a full description. --- You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/sqlalchemy/26fb15e2-7ab8-420f-a48a-4bcc42ddd906n%40googlegroups.com.