Hi Kenneth,

Thank you for your reply. I find out that the leak was coming somehow from my individual DoFns. I replaced all the connections by a connection pooling and I haven't seen connection leaks since. I will keep monitoring the pipeline state and if I see new leaks, I would investigate to provide more details to the JIRA board.

Best regards,

Jonathan

On 14/01/2019 21:37, Kenneth Knowles wrote:
Hi Jonathan,

JdbcIO.write() just invokes this DoFn: https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765

It establishes a connection in @StartBundle and then in @FinishBundle it commits a batch and closes the connection. If an error happens in @StartBundle or @ProcessElement there will be a retry with a fresh instance of the DoFn, which will establish a new connection. It looks like neither @StartBundle nor @ProcessElement closes the connection, so I'm guessing that the old connection sticks around because the worker process was not terminated. So the Beam runner and Dataflow service are working as intended and this is an issue with JdbcIO, unless I've made a mistake in my reading or analysis.

Would you mind reporting these details to https://issues.apache.org/jira/projects/BEAM/ ?

Kenn

On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron <[email protected] <mailto:[email protected]>> wrote:

    Hello !

    My question is maybe mainly GCP-oriented, so I apologize if it is
    not fully related to the Beam community.

    We have a streaming pipeline running on Dataflow which writes data
    to a PostgreSQL instance hosted on Cloud SQL. This database is
    suffering from connection leak spikes on a regular basis:

    The connections are kept alive until the pipeline is canceled/drained:

    We are writing to the database with:

    - individual DoFn where we open/close the connection using the
    standard JDBC try/catch (SQLException ex)/finally statements;

    - a Pipeline.apply(JdbcIO.<SessionData>write()) operations.

    I observed that these spikes happens most of the time after I/O
    errors with the database. Has anyone observed the same situation ?

    I have several questions/observations, please correct me if I am
    wrong (I am not from the java environment, so some can seem pretty
    trivial) :

    - Does the apply method handles SQLException or I/O errors ?

    - Would the use of a connection pool prevents such behaviours ? If
    so, how would one implement it to allow all workers to use it ?
    Could it be implemented with JDBC Connection pooling ?

    I am worrying about the serialization if one would pass a
    Connection item as an argument of a DoFn.

    Thank you in advance for your comments and reactions.

    Best regards,

    Jonathan

Reply via email to