Hii team,

We are facing an issue while trying to push data to RDBMS(oracle in our
case) while it runs for small amount of records but when is run through
bigger dataset it fails, throwing this error,

Error message from worker: org.apache.beam.sdk.util.UserCodeException:
> java.sql.BatchUpdateException: IO Error: Connection reset by peer
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
> java.sql.BatchUpdateException: IO Error: Connection reset by peer
> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
> [java.sql.SQLRecoverableException: Closed Connection]] at
> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
> Source) at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
> at
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
> Source) at
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
> at
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
> at
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
> at
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
> at
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
> at
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
> [java.sql.SQLRecoverableException: Closed Connection] at
> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
> at
> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
> at
> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
> at
> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
> at
> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
> at
> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
> ... 28 more


Here is the code snippet we are using

coders.registry.register_coder(self.ExampleRow, coders.RowCoder)

data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
    lambda x: self.ExampleRow(**x)).with_output_types(
    self.ExampleRow) \
       | f"Write to RDBMS" >> WriteToJdbc(
    table_name=self.task["tablename"],
    driver_class_name=self.task['driver_class_name'],
    jdbc_url=self.task['jdbc_url'],
    username=self.task['username'],
    password=self.task['password'],
    classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])


How do we use dataflow to push bulk data in batch/streaming.

Thanks,
Somnath Chouwdhury.

Reply via email to