Hi Kenneth,
Thank you for your reply. I find out that the leak was coming somehow
from my individual DoFns. I replaced all the connections by a connection
pooling and I haven't seen connection leaks since. I will keep
monitoring the pipeline state and if I see new leaks, I would
investigate to provide more details to the JIRA board.
Best regards,
Jonathan
On 14/01/2019 21:37, Kenneth Knowles wrote:
Hi Jonathan,
JdbcIO.write() just invokes this DoFn:
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765
It establishes a connection in @StartBundle and then in @FinishBundle
it commits a batch and closes the connection. If an error happens
in @StartBundle or @ProcessElement there will be a retry with a fresh
instance of the DoFn, which will establish a new connection. It looks
like neither @StartBundle nor @ProcessElement closes the connection,
so I'm guessing that the old connection sticks around because the
worker process was not terminated. So the Beam runner and Dataflow
service are working as intended and this is an issue with JdbcIO,
unless I've made a mistake in my reading or analysis.
Would you mind reporting these details to
https://issues.apache.org/jira/projects/BEAM/ ?
Kenn
On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron
<[email protected] <mailto:[email protected]>> wrote:
Hello !
My question is maybe mainly GCP-oriented, so I apologize if it is
not fully related to the Beam community.
We have a streaming pipeline running on Dataflow which writes data
to a PostgreSQL instance hosted on Cloud SQL. This database is
suffering from connection leak spikes on a regular basis:
The connections are kept alive until the pipeline is canceled/drained:
We are writing to the database with:
- individual DoFn where we open/close the connection using the
standard JDBC try/catch (SQLException ex)/finally statements;
- a Pipeline.apply(JdbcIO.<SessionData>write()) operations.
I observed that these spikes happens most of the time after I/O
errors with the database. Has anyone observed the same situation ?
I have several questions/observations, please correct me if I am
wrong (I am not from the java environment, so some can seem pretty
trivial) :
- Does the apply method handles SQLException or I/O errors ?
- Would the use of a connection pool prevents such behaviours ? If
so, how would one implement it to allow all workers to use it ?
Could it be implemented with JDBC Connection pooling ?
I am worrying about the serialization if one would pass a
Connection item as an argument of a DoFn.
Thank you in advance for your comments and reactions.
Best regards,
Jonathan