Hii team, We are facing an issue while trying to push data to RDBMS(oracle in our case) while it runs for small amount of records but when is run through bigger dataset it fails, throwing this error,
Error message from worker: org.apache.beam.sdk.util.UserCodeException: > java.sql.BatchUpdateException: IO Error: Connection reset by peer > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) > org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) > org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) > org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) > org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) > org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) > org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123) > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546) > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) > org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > java.base/java.lang.Thread.run(Thread.java:829) Caused by: > java.sql.BatchUpdateException: IO Error: Connection reset by peer > oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345) > oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107) > oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987) > oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939) > oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261) > org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242) > org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242) > org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414) > org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363) > Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: > [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: > [java.sql.SQLRecoverableException: Closed Connection]] at > org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174) > at > org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403) > at > org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363) > at > org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) > at > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) > at > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789) > at > org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357) > at > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527) > at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at > org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown > Source) at > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800) > at > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) > at > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) > at > org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123) > at > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546) > at > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) > at > org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at > org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) Caused by: > org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions: > [java.sql.SQLRecoverableException: Closed Connection] at > org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174) > at > org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161) > ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385) > at > oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056) > at > oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828) > at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at > oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146) > at > oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110) > at > org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161) > ... 28 more Here is the code snippet we are using coders.registry.register_coder(self.ExampleRow, coders.RowCoder) data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map( lambda x: self.ExampleRow(**x)).with_output_types( self.ExampleRow) \ | f"Write to RDBMS" >> WriteToJdbc( table_name=self.task["tablename"], driver_class_name=self.task['driver_class_name'], jdbc_url=self.task['jdbc_url'], username=self.task['username'], password=self.task['password'], classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"]) How do we use dataflow to push bulk data in batch/streaming. Thanks, Somnath Chouwdhury.