[ https://issues.apache.org/jira/browse/FLINK-22311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324171#comment-17324171 ]
Maciej Bryński edited comment on FLINK-22311 at 4/17/21, 8:34 AM: ------------------------------------------------------------------ This is the log. I have few duplicate records with 2021-04-15T22:15:42 timestamp. {code:java} 1:052021-04-15T23:16:49.762722475+00:00 stdout F 2021-04-15 23:16:49,761 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=consumer-flink-ingestion-raw-2, groupId=flink-ingestion-raw] Error sending fetch request (sessionId=842995853, epoch=11339) to node 1: {}. 2021-04-15T22:16:04.498583738+00:00 stdout F at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at com.getindata.flink.ingestion.JdbcXaSinkDeserializationWrapper.invoke(JdbcXaSinkDeserializationWrapper.java:22) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at com.getindata.flink.ingestion.JdbcXaSinkDeserializationWrapper.invoke(JdbcXaSinkDeserializationWrapper.java:37) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.invoke(JdbcXaSinkFunction.java:287) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.internal.executor.DynamicBatchStatementExecutor.executeBatch(DynamicBatchStatementExecutor.java:73) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9711) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F 2021-04-15T22:16:04.498583738+00:00 stdout F java.sql.BatchUpdateException: ORA-04021: timeout occurred while waiting to lock object 2021-04-15T22:16:04.498583738+00:00 stdout F 2021-04-15 22:16:04,490 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat [] - JDBC executeBatch error, retry times = 0 {code} was (Author: maver1ck): This is the log. I have few duplicate records with 2021-04-15T22:15:42 timestamp. {code:java} 1:052021-04-15T23:16:49.762722475+00:00 stdout F 2021-04-15 23:16:49,761 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=consumer-flink-ingestion-raw-2, groupId=flink-ingestion-raw] Error sending fetch request (sessionId=842995853, epoch=11339) to node 1: {}. 2021-04-15T22:16:04.498583738+00:00 stdout F at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) [flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] 2021-04-15T22:16:04.498583738+00:00 stdout F at com.getindata.flink.ingestion.JdbcXaSinkDeserializationWrapper.invoke(JdbcXaSinkDeserializationWrapper.java:22) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at com.getindata.flink.ingestion.JdbcXaSinkDeserializationWrapper.invoke(JdbcXaSinkDeserializationWrapper.java:37) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.invoke(JdbcXaSinkFunction.java:287) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at org.apache.flink.connector.jdbc.internal.executor.DynamicBatchStatementExecutor.executeBatch(DynamicBatchStatementExecutor.java:73) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9711) ~[ingestion-1.2.1.jar:?] 2021-04-15T22:16:04.498583738+00:00 stdout F 2021-04-15T22:16:04.498583738+00:00 stdout F java.sql.BatchUpdateException: ORA-04021: timeout occurred while waiting to lock object 2021-04-15T22:16:04.498583738+00:00 stdout F 2021-04-15 22:16:04,490 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat [] - JDBC executeBatch error, retry times = 0 {code} > Flink JDBC XA connector need to set maxRetries to 0 to properly working > ----------------------------------------------------------------------- > > Key: FLINK-22311 > URL: https://issues.apache.org/jira/browse/FLINK-22311 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.13.0 > Reporter: Maciej Bryński > Priority: Major > > Hi, > We're using XA connector from Flink 1.13 in one of our projects and we were > able to create duplicates of records during write to Oracle. > The reason was that default MAX_RETRIES in JdbcExecutionOptions is 3 and this > can cause duplicates in DB. > I think we should at least mention this in docs or even validate this option > when creating XA Sink. > In documentation we're using defaults. > https://github.com/apache/flink/pull/10847/files#diff-a585e56c997756bb7517ebd2424e5fab5813cee67d8dee3eab6ddd0780aff627R88 -- This message was sent by Atlassian Jira (v8.3.4#803005)