[ https://issues.apache.org/jira/browse/FLINK-21674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301482#comment-17301482 ]
Roman Khachatryan commented on FLINK-21674: ------------------------------------------- Thanks for the clarification [~fuyaoli]! > JDBC sink can't get valid connection after 5 minutes using Oracle JDBC driver > ----------------------------------------------------------------------------- > > Key: FLINK-21674 > URL: https://issues.apache.org/jira/browse/FLINK-21674 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.12.1 > Environment: Flink version: 1.12.1 Scala version: 2.11 Java version: > 1.11 Flink System parallelism: 1 JDBC Driver: Oracle ojdbc10 Database: Oracle > Autonomous Database on Oracle Cloud Infrastructure version 19c(You can regard > this as an cloud based Oracle Database) > > Flink user mailing list: > http://mail-archives.apache.org/mod_mbox/flink-user/202103.mbox/%3CCH2PR10MB402466373B33A3BBC5635A0AEC8C9%40CH2PR10MB4024.namprd10.prod.outlook.com%3E > Reporter: Fuyao > Priority: Blocker > > I use JDBCSink.sink() method to sink data to Oracle Autonomous Data Warehousr > with Oracle JDBC driver. I can sink data into Oracle Autonomous database > sucessfully. If there is IDLE time of over 5 minutes, then do a insertion, > the retry mechanism can't reestablish the JDBC and it will run into the error > below. I have set the retry to be 3 times, even after retry, it will still > fail. Only restart the application(an automatic process) could solve the > issue from checkpoint. > 11:41:04,872 ERROR > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC > executeBatch error, retry times = 0 > java.sql.BatchUpdateException: IO Error: Broken pipe > It will fail the application and restart from checkpoint. After restarting > from checkpoint, the JDBC connection can be established correctly. > The connection timeout can be configured by > alter system set MAX_IDLE_TIME=1440; // Connection will get timeout after > 1440 minutes. > Such timeout parameter behavior change can be verified by SQL developer. > However, Flink still got connection error after 5 minutes configuring this. > I suspect this is some issues in reading some configuration problems from > Flink side to establish to sucessful connection. > Full log: > {code:java} > 11:41:04,872 ERROR > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC > executeBatch error, retry times = 0 > java.sql.BatchUpdateException: IO Error: Broken pipe > at > oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9711) > at > oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447) > at > oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487) > at > oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75) > at > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > at > org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > 11:41:05,725 ERROR > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC > connection is not valid, and reestablish connection failed. > java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525) > at > oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478) > at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461) > at > oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122) > at > oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75) > at > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > at > org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > 11:41:05,729 ERROR > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC > executeBatch error, retry times = 0 > java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204) > at > oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093) > at > oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183) > at > org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > 11:41:05,770 ERROR > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC > executeBatch error, retry times = 1 > java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204) > at > oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093) > at > oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183) > at > org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > 11:41:06,820 ERROR > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC > executeBatch error, retry times = 2 > java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204) > at > oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093) > at > oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183) > at > org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > 11:41:08,865 ERROR > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC > executeBatch error, retry times = 3 > java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204) > at > oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093) > at > oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183) > at > org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > 11:41:08,866 WARN > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - Writing > records to JDBC failed. > java.io.IOException: java.sql.SQLRecoverableException: Closed Connection > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204) > at > oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093) > at > oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183) > at > org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) > ... 11 more > 11:41:08,866 WARN > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Error > closing producer. > java.lang.NoSuchMethodError: > org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V > at > org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > 11:41:08,868 WARN org.apache.flink.runtime.taskmanager.Task > - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: > invoice-notification, Sink: Print to Std. Out, Sink: header-notification, > Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, > Sink: distributions-notification) (1/1)#0 (95b16d216f03759f0f0c131ba188b338) > switched from RUNNING to FAILED. > java.io.IOException: Writing records to JDBC failed. > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75) > at > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > at > org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.io.IOException: Reestablish JDBC connection failed > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167) > ... 29 more > Caused by: java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525) > at > oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478) > at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461) > at > oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122) > at > oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195) > ... 30 more > 11:41:08,869 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for ProcessTableOutput -> (Sink: adwSink, Sink: > Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: > header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: > Print to Std. Out, Sink: distributions-notification) (1/1)#0 > (95b16d216f03759f0f0c131ba188b338). > 11:41:08,876 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor > - Un-registering task and sending final execution state FAILED to > JobManager for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. > Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink: > header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink: > Print to Std. Out, Sink: distributions-notification) (1/1)#0 > 95b16d216f03759f0f0c131ba188b338. > 11:41:08,880 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink: > invoice-notification, Sink: Print to Std. Out, Sink: header-notification, > Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out, > Sink: distributions-notification) (1/1) (95b16d216f03759f0f0c131ba188b338) > switched from RUNNING to FAILED on ac2c0e70-42f9-4d5d-820a-19f6561a6297 @ > localhost (dataPort=-1). > java.io.IOException: Writing records to JDBC failed. > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170) > at > org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > at > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75) > at > org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > at > org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.io.IOException: Reestablish JDBC connection failed > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167) > ... 29 more > Caused by: java.sql.SQLRecoverableException: Closed Connection > at > oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525) > at > oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478) > at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461) > at > oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122) > at > oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98) > at > org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81) > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195) > ... 30 more > 11:41:08,886 INFO > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy > - Calculating tasks to restart to recover the failed task > 91f695b4d2df74b06fe58043ee03541f_0. > 11:41:08,887 INFO > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy > - 9 tasks should be restarted to recover the failed task > 91f695b4d2df74b06fe58043ee03541f_0. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)