[ https://issues.apache.org/jira/browse/FLINK-24151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arvid Heise resolved FLINK-24151. --------------------------------- Resolution: Done > KafkaSink fails with setMaxConcurrentCheckpoints being enabled > -------------------------------------------------------------- > > Key: FLINK-24151 > URL: https://issues.apache.org/jira/browse/FLINK-24151 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.0 > Reporter: Matthias > Assignee: Arvid Heise > Priority: Blocker > Fix For: 1.14.0 > > Attachments: release-testing-run-6.tar.gz > > > We experienced a {{RuntimeException}} in a test run for FLINK-23850 : > {code} > java.lang.RuntimeException: Failed to send data to Kafka: This exception is > raised by the broker if it could not locate the producer metadata associated > with the producerId in question. This could happen if, for instance, the > producer's records were deleted because their retention time had elapsed. > Once the last records of the producerId are removed, the producer's metadata > is removed from the broker, and future appends by the producer will return > this exception. > at > org.apache.flink.connector.kafka.sink.KafkaWriter.checkErroneous(KafkaWriter.java:263) > ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:178) > ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:161) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at StreamExecCalc$6.processElement(Unknown Source) ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:36) > ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:27) > ~[flink-sql-connector-kafka_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141) > ~[flink-table_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:341) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:490) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:789) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:741) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.11-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265] > Caused by: > org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException: > This exception is raised by the broker if it could not locate the producer > metadata associated with the producerId in question. This could happen if, > for instance, the producer's records were deleted because their retention > time had elapsed. Once the last records of the producerId are removed, the > producer's metadata is removed from the broker, and future appends by the > producer will return this exception. > {code} > Test job executed: > {code:java} > Configuration config = new Configuration(); > > config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, > true); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(config); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 2000)); > env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); > env.setParallelism(6); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.createTable("T1", > TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("pk", DataTypes.STRING().notNull()) > .column("x", DataTypes.STRING().notNull()) > .build()) > .option("topic", "flink-23850-in1") > .option("properties.bootstrap.servers", > FLINK23850Utils.BOOTSTRAP_SERVERS) > .option("value.format", "csv") > .option("scan.startup.mode", "earliest-offset") > .build()); > final Table resultTable = > tableEnv.sqlQuery( > "SELECT " > + "T1.pk, " > + "'asd', " > + "'foo', " > + "'bar' " > + "FROM T1"); > tableEnv.createTable("T4", > TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("pk", DataTypes.STRING().notNull()) > .column("some_calculated_value", > DataTypes.STRING()) > .column("pk1", DataTypes.STRING()) > .column("pk2", DataTypes.STRING()) > .build()) > .option("topic", "flink-23850-out") > .option("properties.bootstrap.servers", > FLINK23850Utils.BOOTSTRAP_SERVERS) > .option("value.format", "csv") > .option("sink.delivery-guarantee", "exactly-once") > .option("sink.transactional-id-prefix", "flink-23850") > .option("scan.startup.mode", "earliest-offset") > .build()); > resultTable.executeInsert("T4"); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)