Hello Cham,
The Runner in use is Dataflow Runner.
The last 28 lines aren't available in Cloud logging as well.
The Code shared above works just fine with 2-3 records but starts to fail
when we try with a bigger source data payload.
Does it look like multiple threads trying to acquire a write lock to the DB
table(Oracle table)?
*Thanks and Regards,*
*Varun Rauthan*
On Wed, Feb 22, 2023 at 1:23 AM Chamikara Jayalath via user <
[email protected]> wrote:
> Which runner are you using ?
>
> Also, do you have the bottom of the StackTrace here ? It's possibly due to
> Docker containers running the Java SDK not having access to your database,
> but I'm not sure based on the information provided.
>
> Thanks,
> Cham
>
> On Tue, Feb 21, 2023 at 11:32 AM Somnath Chouwdhury <
> [email protected]> wrote:
>
>> Hii team,
>>
>> We are facing an issue while trying to push data to RDBMS(oracle in our
>> case) while it runs for small amount of records but when is run through
>> bigger dataset it fails, throwing this error,
>>
>> Error message from worker: org.apache.beam.sdk.util.UserCodeException:
>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>> java.sql.BatchUpdateException: IO Error: Connection reset by peer
>>> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
>>> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
>>> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>> Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [java.sql.SQLRecoverableException: Closed Connection]] at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
>>> at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
>>> Source) at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>> at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>> at org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634) at
>>> org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source) at
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>> at
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>> at
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>> at
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>> at
>>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
>>> at
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>> at
>>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>> at
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at
>>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at java.base/java.lang.Thread.run(Thread.java:829) Caused by:
>>> org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
>>> [java.sql.SQLRecoverableException: Closed Connection] at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
>>> at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>> ... 27 more Caused by: java.sql.SQLRecoverableException: Closed Connection
>>> at
>>> oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
>>> at
>>> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
>>> at
>>> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
>>> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811) at
>>> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
>>> at
>>> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
>>> at
>>> org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
>>> ... 28 more
>>
>>
>> Here is the code snippet we are using
>>
>> coders.registry.register_coder(self.ExampleRow, coders.RowCoder)
>>
>> data = p_input | beam.ParDo(AddColumns(self.ExampleRow._fields)) | beam.Map(
>> lambda x: self.ExampleRow(**x)).with_output_types(
>> self.ExampleRow) \
>> | f"Write to RDBMS" >> WriteToJdbc(
>> table_name=self.task["tablename"],
>> driver_class_name=self.task['driver_class_name'],
>> jdbc_url=self.task['jdbc_url'],
>> username=self.task['username'],
>> password=self.task['password'],
>> classpath=["com.oracle.database.jdbc:ojdbc8:21.7.0.0"])
>>
>>
>> How do we use dataflow to push bulk data in batch/streaming.
>>
>> Thanks,
>> Somnath Chouwdhury.
>>
>
Error message from worker: org.apache.beam.sdk.util.UserCodeException:
java.sql.BatchUpdateException: IO Error: Connection reset by peer
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.sql.BatchUpdateException: IO Error: Connection reset by peer
oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2414)
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
Suppressed: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
[org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
[java.sql.SQLRecoverableException: Closed Connection]]
at
org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.executeBatch(JdbcIO.java:2403)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn.processElement(JdbcIO.java:2363)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$WriteFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
at
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$1.process(JdbcIO.java:1634)
at
org.apache.beam.sdk.io.jdbc.JdbcIO$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
at
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
at
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:123)
at
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:546)
at
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
at
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.commons.dbcp2.SQLExceptionList: 1 exceptions:
[java.sql.SQLRecoverableException: Closed Connection]
at
org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:174)
at
org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
... 27 more
Caused by: java.sql.SQLRecoverableException: Closed Connection
at
oracle.jdbc.driver.PhysicalConnection.requireOpenConnection(PhysicalConnection.java:11385)
at
oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:4056)
at
oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1828)
at
oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1811)
at
oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:146)
at
oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:110)
at
org.apache.commons.dbcp2.DelegatingStatement.close(DelegatingStatement.java:161)
... 28 more