[ 
https://issues.apache.org/jira/browse/FLINK-24151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arvid Heise resolved FLINK-24151.
---------------------------------
    Resolution: Done

> KafkaSink fails with setMaxConcurrentCheckpoints being enabled
> --------------------------------------------------------------
>
>                 Key: FLINK-24151
>                 URL: https://issues.apache.org/jira/browse/FLINK-24151
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.0
>            Reporter: Matthias
>            Assignee: Arvid Heise
>            Priority: Blocker
>             Fix For: 1.14.0
>
>         Attachments: release-testing-run-6.tar.gz
>
>
> We experienced a {{RuntimeException}} in a test run for FLINK-23850 :
> {code}
> java.lang.RuntimeException: Failed to send data to Kafka: This exception is 
> raised by the broker if it could not locate the producer metadata associated 
> with the producerId in question. This could happen if, for instance, the 
> producer's records were deleted because their retention time had elapsed. 
> Once the last records of the producerId are removed, the producer's metadata 
> is removed from the broker, and future appends by the producer will return 
> this exception.
>         at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.checkErroneous(KafkaWriter.java:263)
>  ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:178) 
> ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:161)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at StreamExecCalc$6.processElement(Unknown Source) ~[?:?]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36)
>  ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27)
>  ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141)
>  ~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:341)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
> Caused by: 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException:
>  This exception is raised by the broker if it could not locate the producer 
> metadata associated with the producerId in question. This could happen if, 
> for instance, the producer's records were deleted because their retention 
> time had elapsed. Once the last records of the producerId are removed, the 
> producer's metadata is removed from the broker, and future appends by the 
> producer will return this exception.
> {code}
> Test job executed:
> {code:java}
>         Configuration config = new Configuration();
>         
> config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
>  true);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(config);
>         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000));
>         env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);
>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
>         env.setParallelism(6);
>         final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>         tableEnv.createTable("T1",
>                 TableDescriptor.forConnector("kafka")
>                         .schema(Schema.newBuilder()
>                                 .column("pk", DataTypes.STRING().notNull())
>                                 .column("x", DataTypes.STRING().notNull())
>                                 .build())
>                         .option("topic", "flink-23850-in1")
>                         .option("properties.bootstrap.servers", 
> FLINK23850Utils.BOOTSTRAP_SERVERS)
>                         .option("value.format", "csv")
>                         .option("scan.startup.mode", "earliest-offset")
>                         .build());
>         final Table resultTable =
>                 tableEnv.sqlQuery(
>                         "SELECT "
>                                 + "T1.pk, "
>                                 + "'asd', "
>                                 + "'foo', "
>                                 + "'bar' "
>                                 + "FROM T1");
>         tableEnv.createTable("T4",
>                 TableDescriptor.forConnector("kafka")
>                         .schema(Schema.newBuilder()
>                                 .column("pk", DataTypes.STRING().notNull())
>                                 .column("some_calculated_value", 
> DataTypes.STRING())
>                                 .column("pk1", DataTypes.STRING())
>                                 .column("pk2", DataTypes.STRING())
>                                 .build())
>                         .option("topic", "flink-23850-out")
>                         .option("properties.bootstrap.servers", 
> FLINK23850Utils.BOOTSTRAP_SERVERS)
>                         .option("value.format", "csv")
>                         .option("sink.delivery-guarantee", "exactly-once")
>                         .option("sink.transactional-id-prefix", "flink-23850")
>                         .option("scan.startup.mode", "earliest-offset")
>                         .build());
>         resultTable.executeInsert("T4");
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to