Sorry, I spell it wrong, which I mean the PR. Here it is https://github.com/apache/flink/pull/17276 .
Marco Villalobos <mvillalo...@kineteque.com> 于2021年12月1日周三 下午9:18写道: > Thank you. One last question. What is an RP? Where can I read it? > > Marco > > On Nov 30, 2021, at 11:06 PM, Hang Ruan <ruanhang1...@gmail.com> wrote: > > Hi, > > In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint > is open is the default behavior in KafkaSourceBuilder. And it can not be > changed in KafkaSourceBuilder. > > By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>, > we could change the behavior. This problem will be fixed in 1.12.6. It > seems not to be contained in your version. > > Reading the RP will be helpful for you to understand the behavior. > > > Marco Villalobos <mvillalo...@kineteque.com> 于2021年12月1日周三 上午3:43写道: > >> Thanks! >> >> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT >> does not exist in Flink 1.12. >> >> Is that property supported with the string >> "commit.offsets.on.checkpoints"? >> >> How do I configure that behavior so that offsets get committed on >> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the >> default behavior with checkpoints? >> >> >> >> >> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan <ruanhang1...@gmail.com> wrote: >> >>> Hi, >>> >>> Maybe you can write like this : >>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), >>> "true"); >>> >>> Other additional properties could be found here : >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties >>> >>> Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午11:08写道: >>> >>>> Thank you for the information. That still does not answer my question >>>> though. How do I configure Flink in 1.12 using the KafkaSourceBuilder so >>>> that consumer should commit offsets back to Kafka on checkpoints? >>>> >>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this >>>> method. >>>> >>>> But now that I am using KafkaSourceBuilder, how do I configure that >>>> behavior so that offsets get committed on checkpoints? Or is that the >>>> default behavior with checkpoints? >>>> >>>> -Marco >>>> >>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com> >>>> wrote: >>>> >>>>> Hi! >>>>> >>>>> Flink 1.14 release note states about this. See [1]. >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer >>>>> >>>>> Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午7:12写道: >>>>> >>>>>> Hi everybody, >>>>>> >>>>>> I am using Flink 1.12 and migrating my code from using >>>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder. >>>>>> >>>>>> FlinkKafkaConsumer has the method >>>>>> >>>>>> /** >>>>>>> * Specifies whether or not the consumer should commit offsets back >>>>>>> to Kafka on checkpoints. >>>>>>> * This setting will only have effect if checkpointing is enabled >>>>>>> for the job. If checkpointing isn't >>>>>>> * enabled, only the "auto.commit.enable" (for 0.8) / >>>>>>> "enable.auto.commit" (for 0.9+) property >>>>>>> * settings will be used. >>>>>>> */ >>>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) >>>>>> >>>>>> >>>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I >>>>>> already have checkpointing configured, is it necessary to setup "commit >>>>>> offsets on checkpoints"? >>>>>> >>>>>> The Flink 1.12 documentation does not discuss this topic, and the >>>>>> Flink 1.14 documentation says little about it. >>>>>> >>>>>> For example, the Flink 1.14 documentation states: >>>>>> >>>>>> Additional Properties >>>>>>> In addition to properties described above, you can set arbitrary >>>>>>> properties for KafkaSource and KafkaConsumer by using >>>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource >>>>>>> has >>>>>>> following options for configuration: >>>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming >>>>>>> offsets to Kafka brokers on checkpoint >>>>>> >>>>>> >>>>>> And the 1.12 documentation states: >>>>>> >>>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will >>>>>>> consume records from a topic and periodically checkpoint all its Kafka >>>>>>> offsets, together with the state of other operations. In case of a job >>>>>>> failure, Flink will restore the streaming program to the state of the >>>>>>> latest checkpoint and re-consume the records from Kafka, starting from >>>>>>> the >>>>>>> offsets that were stored in the checkpoint. >>>>>>> The interval of drawing checkpoints therefore defines how much the >>>>>>> program may have to go back at most, in case of a failure. To use fault >>>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be >>>>>>> enabled >>>>>>> in the job. >>>>>>> If checkpointing is disabled, the Kafka consumer will periodically >>>>>>> commit the offsets to Zookeeper. >>>>>> >>>>>> >>>>>> Thank you. >>>>>> >>>>>> Marco >>>>>> >>>>>> >>>>>> >