Hi everybody, I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer to using the KafkaSourceBuilder.
FlinkKafkaConsumer has the method /** > * Specifies whether or not the consumer should commit offsets back to > Kafka on checkpoints. > * This setting will only have effect if checkpointing is enabled for the > job. If checkpointing isn't > * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" > (for 0.9+) property > * settings will be used. > */ > FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) How do I setup that parameter when using the KafkaSourceBuilder? If I already have checkpointing configured, is it necessary to setup "commit offsets on checkpoints"? The Flink 1.12 documentation does not discuss this topic, and the Flink 1.14 documentation says little about it. For example, the Flink 1.14 documentation states: Additional Properties > In addition to properties described above, you can set arbitrary > properties for KafkaSource and KafkaConsumer by using > setProperties(Properties) and setProperty(String, String). KafkaSource has > following options for configuration: > commit.offsets.on.checkpoint specifies whether to commit consuming offsets > to Kafka brokers on checkpoint And the 1.12 documentation states: With Flinkās checkpointing enabled, the Flink Kafka Consumer will consume > records from a topic and periodically checkpoint all its Kafka offsets, > together with the state of other operations. In case of a job failure, > Flink will restore the streaming program to the state of the latest > checkpoint and re-consume the records from Kafka, starting from the offsets > that were stored in the checkpoint. > The interval of drawing checkpoints therefore defines how much the program > may have to go back at most, in case of a failure. To use fault tolerant > Kafka Consumers, checkpointing of the topology needs to be enabled in the > job. > If checkpointing is disabled, the Kafka consumer will periodically commit > the offsets to Zookeeper. Thank you. Marco