Hi Jonathan, JdbcIO.write() just invokes this DoFn: https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L765
It establishes a connection in @StartBundle and then in @FinishBundle it commits a batch and closes the connection. If an error happens in @StartBundle or @ProcessElement there will be a retry with a fresh instance of the DoFn, which will establish a new connection. It looks like neither @StartBundle nor @ProcessElement closes the connection, so I'm guessing that the old connection sticks around because the worker process was not terminated. So the Beam runner and Dataflow service are working as intended and this is an issue with JdbcIO, unless I've made a mistake in my reading or analysis. Would you mind reporting these details to https://issues.apache.org/jira/projects/BEAM/ ? Kenn On Mon, Jan 14, 2019 at 12:51 AM Jonathan Perron < [email protected]> wrote: > Hello ! > > My question is maybe mainly GCP-oriented, so I apologize if it is not > fully related to the Beam community. > > We have a streaming pipeline running on Dataflow which writes data to a > PostgreSQL instance hosted on Cloud SQL. This database is suffering from > connection leak spikes on a regular basis: > > The connections are kept alive until the pipeline is canceled/drained: > > We are writing to the database with: > > - individual DoFn where we open/close the connection using the standard > JDBC try/catch (SQLException ex)/finally statements; > > - a Pipeline.apply(JdbcIO.<SessionData>write()) operations. > > I observed that these spikes happens most of the time after I/O errors > with the database. Has anyone observed the same situation ? > > I have several questions/observations, please correct me if I am wrong (I > am not from the java environment, so some can seem pretty trivial) : > > - Does the apply method handles SQLException or I/O errors ? > > - Would the use of a connection pool prevents such behaviours ? If so, how > would one implement it to allow all workers to use it ? Could it be > implemented with JDBC Connection pooling ? > > I am worrying about the serialization if one would pass a Connection item > as an argument of a DoFn. > > Thank you in advance for your comments and reactions. > > Best regards, > > Jonathan >
