Hi Jonathan,

JdbcIO.write() just invokes this DoFn:
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765

It establishes a connection in @StartBundle and then in @FinishBundle it
commits a batch and closes the connection. If an error happens
in @StartBundle or @ProcessElement there will be a retry with a fresh
instance of the DoFn, which will establish a new connection. It looks like
neither @StartBundle nor @ProcessElement closes the connection, so I'm
guessing that the old connection sticks around because the worker process
was not terminated. So the Beam runner and Dataflow service are working as
intended and this is an issue with JdbcIO, unless I've made a mistake in my
reading or analysis.

Would you mind reporting these details to
https://issues.apache.org/jira/projects/BEAM/ ?

Kenn

On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron <
[email protected]> wrote:

> Hello !
>
> My question is maybe mainly GCP-oriented, so I apologize if it is not
> fully related to the Beam community.
>
> We have a streaming pipeline running on Dataflow which writes data to a
> PostgreSQL instance hosted on Cloud SQL. This database is suffering from
> connection leak spikes on a regular basis:
>
> The connections are kept alive until the pipeline is canceled/drained:
>
> We are writing to the database with:
>
> - individual DoFn where we open/close the connection using the standard
> JDBC try/catch (SQLException ex)/finally statements;
>
> - a Pipeline.apply(JdbcIO.<SessionData>write()) operations.
>
> I observed that these spikes happens most of the time after I/O errors
> with the database. Has anyone observed the same situation ?
>
> I have several questions/observations, please correct me if I am wrong (I
> am not from the java environment, so some can seem pretty trivial) :
>
> - Does the apply method handles SQLException or I/O errors ?
>
> - Would the use of a connection pool prevents such behaviours ? If so, how
> would one implement it to allow all workers to use it ? Could it be
> implemented with JDBC Connection pooling ?
>
> I am worrying about the serialization if one would pass a Connection item
> as an argument of a DoFn.
>
> Thank you in advance for your comments and reactions.
>
> Best regards,
>
> Jonathan
>

Reply via email to