Sorry for the uncompleted email.

Error log of broken pipeline, the failed SQL will be executed after checkpoint 
automatic recovery. Please share some ideas on this issue. Really appreciate 
it. Thanks!

09:20:02,868 ERROR 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - JDBC 
executeBatch error, retry times = 3
java.sql.SQLRecoverableException: Closed Connection
                at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
                at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
                at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
                at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
                at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
                at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
                at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
                at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
                at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
                at java.base/java.lang.Thread.run(Thread.java:834)
09:20:02,869 WARN  
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat  - Writing 
records to JDBC failed.
java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
                at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
                at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
                at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
                at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLRecoverableException: Closed Connection
                at 
oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
                at 
oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9515)
                at 
oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
                at 
oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
                at 
oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
                at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
                ... 11 more
09:20:02,869 WARN  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Error closing 
producer.
java.lang.NoSuchMethodError: 
org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V
                at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172)
                at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949)
                at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
                at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
                at java.base/java.lang.Thread.run(Thread.java:834)
09:20:02,871 WARN  org.apache.flink.runtime.taskmanager.Task                    
 - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: 
invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: 
Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: 
distributions-notification) (1/1)#0 (b57e84496b38c77e8201536e7d0e3723) switched 
from RUNNING to FAILED.
java.io.IOException: Writing records to JDBC failed.
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)
                at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
                at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
                at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
                at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
                at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
                at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
                at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
                at 
org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)
                at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
                at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
                at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
                at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
                at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
                at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
                at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
                at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
                at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
                at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
                at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Reestablish JDBC connection failed
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
                ... 29 more
Caused by: java.sql.SQLRecoverableException: Closed Connection
                at 
oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
                at 
oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
                at 
oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
                at 
oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
                at 
oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
                at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
                ... 30 more
09:20:02,872 INFO  org.apache.flink.runtime.taskmanager.Task                    
 - Freeing task resources for ProcessTableOutput -> (Sink: adwSink, Sink: Print 
to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: 
header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: 
Print to Std. Out, Sink: distributions-notification) (1/1)#0 
(b57e84496b38c77e8201536e7d0e3723).
09:20:02,878 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           
 - Un-registering task and sending final execution state FAILED to JobManager 
for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: 
invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: 
Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: 
distributions-notification) (1/1)#0 b57e84496b38c77e8201536e7d0e3723.
09:20:02,880 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
 - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: 
invoice-notification, Sink: Print to Std. Out, Sink: header-notification, Sink: 
Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, Sink: 
distributions-notification) (1/1) (b57e84496b38c77e8201536e7d0e3723) switched 
from RUNNING to FAILED on bc6eadd0-19bf-4924-9e6e-62ba5f239d1e @ localhost 
(dataPort=-1).
java.io.IOException: Writing records to JDBC failed.
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)
                at 
org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
                at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
                at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
                at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
                at 
org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
                at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
                at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
                at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
                at 
org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:504)
                at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
                at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
                at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
                at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
                at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
                at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
                at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
                at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
                at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
                at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
                at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
                at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
                at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
                at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Reestablish JDBC connection failed
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
                ... 29 more
Caused by: java.sql.SQLRecoverableException: Closed Connection
                at 
oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
                at 
oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
                at 
oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
                at 
oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
                at 
oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
                at 
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
                at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
                ... 30 more


Thanks,

Best regards,
Fuyao

From: Fuyao Li <fuyao...@oracle.com>
Date: Tuesday, March 2, 2021 at 10:33
To: user <user@flink.apache.org>, Timo Walther <twal...@apache.org>
Subject: Need help with JDBC Broken Pipeline Issue after some idle time
Hi Flink Community,

I need some help with JDBC sink in Datastream API. I can produce some records 
and sink it to database correctly. However, if I wait for 5 minutes between 
insertions. I will run into broken pipeline issue. Ater that, the Flink 
application will restart and recover from checkpoint and execute the failed SQL 
query. I tried hard to search for resources to understand such broken pipeline 
will happen, but I still can’t understand it.

The interesting thing is that, if the idle time is around 3 minutes, everything 
seems to be fine.

It seems to be a timeout related issue, but I just don’t know what should I do 
to fix the issue. I have shared the sink code. Could anyone share some ideas? 
Thank you so much!
My environment settings:
Flink version: 1.12.1
Scala version: 2.11
Java version: 1.11
Flink System parallelism: 1
JDBC Driver: Oracle ojdbc10
Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can 
regard this as an cloud based Oracle Database)

The code for the sink:
        boDataStream
        .addSink(
            JdbcSink.sink(
                "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)",
                (preparedStatement, testInvoiceBo) -> {
                  try {
                      Gson gson = new GsonBuilder()
                              .excludeFieldsWithoutExposeAnnotation()
                              .create();
                      String invoiceId = testInvoiceBo.getINVOICE_ID();
                      String json = gson.toJson(testInvoiceBo);
                      log.info("insertion information: {}", json);
                      preparedStatement.setString(1, invoiceId);
                      preparedStatement.setString(2, json);
                  } catch (JsonIOException e) {
                      log.error("Failed to parse JSON", e);
                  }
                },
                new JdbcExecutionOptions.Builder()
                .withBatchIntervalMs(0)
                .withBatchSize(1)
                .withMaxRetries(3)
                .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl(DB_URL)
                    .withDriverName("oracle.jdbc.driver.OracleDriver")
                    .withUsername("admin")
                    .withPassword("password")
                    .build()))
        .name("adwSink")
        .uid("adwSink")
        .setParallelism(1);

The JDBC broken pipeline log:


Reply via email to