when using multiprocessing, the connection pool in the new process must be
replaced with a new one. This is usually accomplished by calling
engine.dispose(). However, to maintain the pool in the parent process as well,
replace the connection pool alone without disposing the old one:
engine = create_engine(...)
Session = sessionmaker(bind=engine)
def job_update_rd(data_list):
updated = []
with Session() as session:
for t, ts in data_list:
rd = session.query(RawDataTable).filter(and_(
RawDataTable.timestamp == t,
RawDataTable.ts == ts)).one()
rd.ts = updated_ts[ts]
session.commit()
updated.append(rd)
return updated
def initializer():
engine.pool = engine.pool.recreate()
with Pool(10, initializer=initializer) as p:
upd_list = p.map(job_update_rd, chunks)
For many years we've advised calling engine.dispose() here as documented at
https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
. It was recently pointed out that this closes out the parent process'
connections, so in SQLAlchemy 1.4.33 there will be a parameter so you can
change the above code to engine.dispose(close=False).
On Thu, Mar 31, 2022, at 7:06 AM, Evgenii wrote:
>
>
> Hello!
> From time to time, I need to update data in tables and multiprocessing can
> speed up this process. Last example: I’m trying to update data 7M rows in
> table
>
>
> `SQLAlchemy 1.4.31`, `psycopg2 2.8.6`, `PostgreSQL`
>
> `*def* *job_update_rd*(data_list):
> updated = []
> *with* Session() *as* session:
> *for* t, ts *in* data_list:
> rd = session.query(RawDataTable).filter(and_(
> RawDataTable.timestamp == t,
> RawDataTable.ts == ts)).one()
>
> rd.ts = updated_ts[ts]
> session.commit()
>
> updated.append(rd)
>
> *return* updated
>
> *with* Pool(10) *as* p:
> upd_list = p.map(job_update_rd, chunks)
`
> Code is very simple, but it does not work. I get these errors randomly:
>
> * `psycopg2.OperationalError: SSL error: sslv3 alert bad record mac`
> * `sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL
> error: EOF detected`
> But this example works fine:
>
> `*def* *other_job*(data_list):
> *with* Session() *as* s:
> *return* [s.query(RawDataTable).filter(and_(
> RawDataTable.timestamp == t,
> RawDataTable.ts == ts)).all() *for* t, ts *in* data_list]
>
> *with* Pool(10) *as* p:
> res = p.map(other_job, chunks)
`
>
>
> Please, help to solve this problem.
> Some people is our team also uses multiprocessing, and 1 time a week get
> these errors.
>
>
>
>
>
> --
> SQLAlchemy -
> The Python SQL Toolkit and Object Relational Mapper
>
> http://www.sqlalchemy.org/
>
> To post example code, please provide an MCVE: Minimal, Complete, and
> Verifiable Example. See http://stackoverflow.com/help/mcve for a full
> description.
> ---
> You received this message because you are subscribed to the Google Groups
> "sqlalchemy" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/sqlalchemy/4d61a93d-f83f-455c-ab2a-1cb28e154af8n%40googlegroups.com
>
> <https://groups.google.com/d/msgid/sqlalchemy/4d61a93d-f83f-455c-ab2a-1cb28e154af8n%40googlegroups.com?utm_medium=email&utm_source=footer>.
--
SQLAlchemy -
The Python SQL Toolkit and Object Relational Mapper
http://www.sqlalchemy.org/
To post example code, please provide an MCVE: Minimal, Complete, and Verifiable
Example. See http://stackoverflow.com/help/mcve for a full description.
---
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/sqlalchemy/d0a631b1-8f44-448a-93a1-16c01acfaab4%40www.fastmail.com.