[
https://issues.apache.org/jira/browse/FLINK-21674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299652#comment-17299652
]
Fuyao edited comment on FLINK-21674 at 3/11/21, 3:43 PM:
---------------------------------------------------------
[~roman_khachatryan] Hi Roman, I already set the max retry to be 3. I think the
major issue is that the retry mechanism is not working at all here. It will
always fail. However, restart the application will reestablish the connection.
You can check my logs for details. The code below is used to submit the query.
One more question, JDBC is not able to do select query, right? It is supposed
to do UPDATE or INSERT or DELETE, right?
[~jark]
{code:java}
businessObjectDataStream.addSink(
JdbcSink.sink(
"INSERT INTO ADW_DEMO_TABLE (invoice_id, last_update_time,
invoice_deleted_flag, json_doc) values(?, ?, ?, ?)",
(preparedStatement, testInvoiceBo) -> {
try {
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
String invoiceId = testInvoiceBo.getINVOICE_ID();
Timestamp lastUpdateTime = testInvoiceBo.getHEADER().getOp_ts();
String invoiceDeletedFlag = "0";
String json = gson.toJson(testInvoiceBo);
log.info("insertion information: {}", invoiceId);
log.info("insertion information: {}", lastUpdateTime);
log.info("insertion information: {}", invoiceDeletedFlag);
log.info("insertion information: {}", json);
preparedStatement.setString(1, invoiceId);
preparedStatement.setTimestamp(2, lastUpdateTime);
preparedStatement.setString(3, invoiceDeletedFlag);
preparedStatement.setString(4, json); } catch (JsonIOException e)
{ log.error("Failed to parse JSON", e); }
},
new JdbcExecutionOptions.Builder()
.withBatchIntervalMs(0)
.withBatchSize(1)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(DBCredentials.DB_URL)
.withDriverName(DBCredentials.DB_DRIVER)
.withUsername(DBCredentials.DB_USER)
.withPassword(DBCredentials.DB_PASSWORD)
.build()))
.name("adwSink")
.uid("adwSink")
.setParallelism(1);
{code}
The jar libraries I used for JDBC connection. This is an oracle JDBC driver.
{code:java}
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc10</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ucp</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.security</groupId>
<artifactId>osdt_core</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.security</groupId>
<artifactId>oraclepki</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
{code}
was (Author: fuyaoli):
[~roman_khachatryan] Hi Roman, I already set the max retry to be 3. You can
check my logs for details. The code below is used to submit the query. One more
question, JDBC is not able to do select query, right? It is supposed to do
UPDATE or INSERT or DELETE, right?
{code:java}
businessObjectDataStream.addSink(
JdbcSink.sink(
"INSERT INTO ADW_DEMO_TABLE (invoice_id, last_update_time,
invoice_deleted_flag, json_doc) values(?, ?, ?, ?)",
(preparedStatement, testInvoiceBo) -> {
try {
Gson gson = new GsonBuilder()
.excludeFieldsWithoutExposeAnnotation()
.create();
String invoiceId = testInvoiceBo.getINVOICE_ID();
Timestamp lastUpdateTime = testInvoiceBo.getHEADER().getOp_ts();
String invoiceDeletedFlag = "0";
String json = gson.toJson(testInvoiceBo);
log.info("insertion information: {}", invoiceId);
log.info("insertion information: {}", lastUpdateTime);
log.info("insertion information: {}", invoiceDeletedFlag);
log.info("insertion information: {}", json);
preparedStatement.setString(1, invoiceId);
preparedStatement.setTimestamp(2, lastUpdateTime);
preparedStatement.setString(3, invoiceDeletedFlag);
preparedStatement.setString(4, json); } catch (JsonIOException e)
{ log.error("Failed to parse JSON", e); }
},
new JdbcExecutionOptions.Builder()
.withBatchIntervalMs(0)
.withBatchSize(1)
.withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(DBCredentials.DB_URL)
.withDriverName(DBCredentials.DB_DRIVER)
.withUsername(DBCredentials.DB_USER)
.withPassword(DBCredentials.DB_PASSWORD)
.build()))
.name("adwSink")
.uid("adwSink")
.setParallelism(1);
{code}
The jar libraries I used for JDBC connection. This is an oracle JDBC driver.
{code:java}
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc10</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ucp</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.security</groupId>
<artifactId>osdt_core</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.security</groupId>
<artifactId>oraclepki</artifactId>
<version>19.3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
{code}
> JDBC sink can't get valid connection after 5 minutes using Oracle JDBC driver
> -----------------------------------------------------------------------------
>
> Key: FLINK-21674
> URL: https://issues.apache.org/jira/browse/FLINK-21674
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.12.1
> Environment: Flink version: 1.12.1 Scala version: 2.11 Java version:
> 1.11 Flink System parallelism: 1 JDBC Driver: Oracle ojdbc10 Database: Oracle
> Autonomous Database on Oracle Cloud Infrastructure version 19c(You can regard
> this as an cloud based Oracle Database)
>
> Flink user mailing list:
> http://mail-archives.apache.org/mod_mbox/flink-user/202103.mbox/%3CCH2PR10MB402466373B33A3BBC5635A0AEC8C9%40CH2PR10MB4024.namprd10.prod.outlook.com%3E
> Reporter: Fuyao
> Priority: Blocker
>
> I use JDBCSink.sink() method to sink data to Oracle Autonomous Data Warehousr
> with Oracle JDBC driver. I can sink data into Oracle Autonomous database
> sucessfully. If there is IDLE time of over 5 minutes, then do a insertion,
> the retry mechanism can't reestablish the JDBC and it will run into the error
> below.
> 11:41:04,872 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC
> executeBatch error, retry times = 0
> java.sql.BatchUpdateException: IO Error: Broken pipe
> It will fail the application and restart from checkpoint. After restarting
> from checkpoint, the JDBC connection can be established correctly.
> The connection timeout can be configured by
> alter system set MAX_IDLE_TIME=1440; // Connection will get timeout after
> 1440 minutes.
> Such timeout parameter behavior change can be verified by SQL developer.
> However, Flink still got connection error after 5 minutes configuring this.
> I suspect this is some issues in reading some configuration problems from
> Flink side to establish to sucessful connection.
> Full log:
> {code:java}
> 11:41:04,872 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC
> executeBatch error, retry times = 0
> java.sql.BatchUpdateException: IO Error: Broken pipe
> at
> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9711)
> at
> oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)
> at
> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)
> at
> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:73)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
> at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> at
> org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 11:41:05,725 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC
> connection is not valid, and reestablish connection failed.
> java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
> at
> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
> at
> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
> at
> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
> at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> at
> org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 11:41:05,729 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC
> executeBatch error, retry times = 0
> java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
> at
> oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
> at
> oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
> at
> org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 11:41:05,770 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC
> executeBatch error, retry times = 1
> java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
> at
> oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
> at
> oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
> at
> org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 11:41:06,820 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC
> executeBatch error, retry times = 2
> java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
> at
> oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
> at
> oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
> at
> org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 11:41:08,865 ERROR
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - JDBC
> executeBatch error, retry times = 3
> java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
> at
> oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
> at
> oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
> at
> org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 11:41:08,866 WARN
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat - Writing
> records to JDBC failed.
> java.io.IOException: java.sql.SQLRecoverableException: Closed Connection
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.close(JdbcBatchingOutputFormat.java:232)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:67)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.OracleStatement.ensureOpen(OracleStatement.java:4204)
> at
> oracle.jdbc.driver.OracleStatement.getQueryTimeout(OracleStatement.java:3093)
> at
> oracle.jdbc.driver.OracleStatementWrapper.getQueryTimeout(OracleStatementWrapper.java:183)
> at
> org.myorg.quickstart.boconstruction.stream.BusinessObjectConstruction.lambda$processFunction$16ee7a3d$1(BusinessObjectConstruction.java:103)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:70)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
> ... 11 more
> 11:41:08,866 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Error
> closing producer.
> java.lang.NoSuchMethodError:
> org.apache.kafka.clients.producer.KafkaProducer.close(Ljava/time/Duration;)V
> at
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.close(FlinkKafkaInternalProducer.java:172)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:949)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 11:41:08,868 WARN org.apache.flink.runtime.taskmanager.Task
> - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink:
> invoice-notification, Sink: Print to Std. Out, Sink: header-notification,
> Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out,
> Sink: distributions-notification) (1/1)#0 (95b16d216f03759f0f0c131ba188b338)
> switched from RUNNING to FAILED.
> java.io.IOException: Writing records to JDBC failed.
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
> at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> at
> org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.io.IOException: Reestablish JDBC connection failed
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
> ... 29 more
> Caused by: java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
> at
> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
> at
> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
> at
> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
> ... 30 more
> 11:41:08,869 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing task resources for ProcessTableOutput -> (Sink: adwSink, Sink:
> Print to Std. Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink:
> header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink:
> Print to Std. Out, Sink: distributions-notification) (1/1)#0
> (95b16d216f03759f0f0c131ba188b338).
> 11:41:08,876 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
> - Un-registering task and sending final execution state FAILED to
> JobManager for task ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std.
> Out, Sink: invoice-notification, Sink: Print to Std. Out, Sink:
> header-notification, Sink: Print to Std. Out, Sink: lines-notification, Sink:
> Print to Std. Out, Sink: distributions-notification) (1/1)#0
> 95b16d216f03759f0f0c131ba188b338.
> 11:41:08,880 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
> - ProcessTableOutput -> (Sink: adwSink, Sink: Print to Std. Out, Sink:
> invoice-notification, Sink: Print to Std. Out, Sink: header-notification,
> Sink: Print to Std. Out, Sink: lines-notification, Sink: Print to Std. Out,
> Sink: distributions-notification) (1/1) (95b16d216f03759f0f0c131ba188b338)
> switched from RUNNING to FAILED on ac2c0e70-42f9-4d5d-820a-19f6561a6297 @
> localhost (dataPort=-1).
> java.io.IOException: Writing records to JDBC failed.
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:170)
> at
> org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:54)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
> at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
> at
> org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
> at
> org.myorg.quickstart.processor.InvoiceBoProcessFunction.onTimer(InvoiceBoProcessFunction.java:613)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:193)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.io.IOException: Reestablish JDBC connection failed
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:202)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167)
> ... 29 more
> Caused by: java.sql.SQLRecoverableException: Closed Connection
> at
> oracle.jdbc.driver.PhysicalConnection.needLine(PhysicalConnection.java:3525)
> at
> oracle.jdbc.driver.OracleStatement.closeOrCache(OracleStatement.java:1478)
> at oracle.jdbc.driver.OracleStatement.close(OracleStatement.java:1461)
> at
> oracle.jdbc.driver.OracleStatementWrapper.close(OracleStatementWrapper.java:122)
> at
> oracle.jdbc.driver.OraclePreparedStatementWrapper.close(OraclePreparedStatementWrapper.java:98)
> at
> org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.closeStatements(SimpleBatchStatementExecutor.java:81)
> at
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:195)
> ... 30 more
> 11:41:08,886 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> - Calculating tasks to restart to recover the failed task
> 91f695b4d2df74b06fe58043ee03541f_0.
> 11:41:08,887 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
> - 9 tasks should be restarted to recover the failed task
> 91f695b4d2df74b06fe58043ee03541f_0.
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)