Sorry, I spell it wrong, which I mean the PR. Here it is
https://github.com/apache/flink/pull/17276 .

Marco Villalobos <mvillalo...@kineteque.com> 于2021年12月1日周三 下午9:18写道:

> Thank you. One last question.  What is an RP? Where can I read it?
>
> Marco
>
> On Nov 30, 2021, at 11:06 PM, Hang Ruan <ruanhang1...@gmail.com> wrote:
>
> Hi,
>
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
> is open is the default behavior in KafkaSourceBuilder. And it can not be
> changed in KafkaSourceBuilder.
>
> By this FLINK-24277 <https://issues.apache.org/jira/browse/FLINK-24277>,
> we could change the behavior. This problem will be fixed in 1.12.6. It
> seems not to be contained in your version.
>
> Reading the RP will be helpful for you to understand the behavior.
>
>
> Marco Villalobos <mvillalo...@kineteque.com> 于2021年12月1日周三 上午3:43写道:
>
>> Thanks!
>>
>> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
>> does not exist in Flink 1.12.
>>
>> Is that property supported with the string
>> "commit.offsets.on.checkpoints"?
>>
>> How do I configure that behavior so that offsets get committed on
>> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
>> default behavior with checkpoints?
>>
>>
>>
>>
>> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Maybe you can write like this :
>>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>>> "true");
>>>
>>> Other additional properties could be found here :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>>
>>> Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午11:08写道:
>>>
>>>> Thank you for the information.  That still does not answer my question
>>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>>
>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>>> method.
>>>>
>>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>>> default behavior with checkpoints?
>>>>
>>>> -Marco
>>>>
>>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng <tsreape...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Flink 1.14 release note states about this. See [1].
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>>>
>>>>> Marco Villalobos <mvillalo...@kineteque.com> 于2021年11月30日周二 上午7:12写道:
>>>>>
>>>>>> Hi everybody,
>>>>>>
>>>>>> I am using Flink 1.12 and migrating my code from using
>>>>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>>>>
>>>>>> FlinkKafkaConsumer has the method
>>>>>>
>>>>>> /**
>>>>>>>  * Specifies whether or not the consumer should commit offsets back
>>>>>>> to Kafka on checkpoints.
>>>>>>>  * This setting will only have effect if checkpointing is enabled
>>>>>>> for the job. If checkpointing isn't
>>>>>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>>>>>> "enable.auto.commit" (for 0.9+) property
>>>>>>>  * settings will be used.
>>>>>>> */
>>>>>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>>>>
>>>>>>
>>>>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>>>>> already have checkpointing configured, is it necessary to setup "commit
>>>>>> offsets on checkpoints"?
>>>>>>
>>>>>> The Flink 1.12 documentation does not discuss this topic, and the
>>>>>> Flink 1.14 documentation says little about it.
>>>>>>
>>>>>>  For example, the Flink 1.14 documentation states:
>>>>>>
>>>>>> Additional Properties
>>>>>>> In addition to properties described above, you can set arbitrary
>>>>>>> properties for KafkaSource and KafkaConsumer by using
>>>>>>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>>>>>>> has
>>>>>>> following options for configuration:
>>>>>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>>>>>> offsets to Kafka brokers on checkpoint
>>>>>>
>>>>>>
>>>>>> And the 1.12 documentation states:
>>>>>>
>>>>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>>>>>> consume records from a topic and periodically checkpoint all its Kafka
>>>>>>> offsets, together with the state of other operations. In case of a job
>>>>>>> failure, Flink will restore the streaming program to the state of the
>>>>>>> latest checkpoint and re-consume the records from Kafka, starting from 
>>>>>>> the
>>>>>>> offsets that were stored in the checkpoint.
>>>>>>> The interval of drawing checkpoints therefore defines how much the
>>>>>>> program may have to go back at most, in case of a failure. To use fault
>>>>>>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>>>>>>> enabled
>>>>>>> in the job.
>>>>>>> If checkpointing is disabled, the Kafka consumer will periodically
>>>>>>> commit the offsets to Zookeeper.
>>>>>>
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>> Marco
>>>>>>
>>>>>>
>>>>>>
>

Reply via email to