Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-12-01 Thread Hang Ruan
Sorry, I spell it wrong, which I mean the PR. Here it is
https://github.com/apache/flink/pull/17276 .

Marco Villalobos  于2021年12月1日周三 下午9:18写道:

> Thank you. One last question.  What is an RP? Where can I read it?
>
> Marco
>
> On Nov 30, 2021, at 11:06 PM, Hang Ruan  wrote:
>
> Hi,
>
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
> is open is the default behavior in KafkaSourceBuilder. And it can not be
> changed in KafkaSourceBuilder.
>
> By this FLINK-24277 ,
> we could change the behavior. This problem will be fixed in 1.12.6. It
> seems not to be contained in your version.
>
> Reading the RP will be helpful for you to understand the behavior.
>
>
> Marco Villalobos  于2021年12月1日周三 上午3:43写道:
>
>> Thanks!
>>
>> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
>> does not exist in Flink 1.12.
>>
>> Is that property supported with the string
>> "commit.offsets.on.checkpoints"?
>>
>> How do I configure that behavior so that offsets get committed on
>> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
>> default behavior with checkpoints?
>>
>>
>>
>>
>> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:
>>
>>> Hi,
>>>
>>> Maybe you can write like this :
>>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>>> "true");
>>>
>>> Other additional properties could be found here :
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>>
>>> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>>>
 Thank you for the information.  That still does not answer my question
 though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
 that consumer should commit offsets back to Kafka on checkpoints?

 FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
 method.

 But now that I am using KafkaSourceBuilder, how do I configure that
 behavior so that offsets get committed on checkpoints?  Or is that the
 default behavior with checkpoints?

 -Marco

 On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng 
 wrote:

> Hi!
>
> Flink 1.14 release note states about this. See [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>
> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>
>> Hi everybody,
>>
>> I am using Flink 1.12 and migrating my code from using
>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>
>> FlinkKafkaConsumer has the method
>>
>> /**
>>>  * Specifies whether or not the consumer should commit offsets back
>>> to Kafka on checkpoints.
>>>  * This setting will only have effect if checkpointing is enabled
>>> for the job. If checkpointing isn't
>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>> "enable.auto.commit" (for 0.9+) property
>>>  * settings will be used.
>>> */
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>
>>
>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>> already have checkpointing configured, is it necessary to setup "commit
>> offsets on checkpoints"?
>>
>> The Flink 1.12 documentation does not discuss this topic, and the
>> Flink 1.14 documentation says little about it.
>>
>>  For example, the Flink 1.14 documentation states:
>>
>> Additional Properties
>>> In addition to properties described above, you can set arbitrary
>>> properties for KafkaSource and KafkaConsumer by using
>>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>>> has
>>> following options for configuration:
>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>> offsets to Kafka brokers on checkpoint
>>
>>
>> And the 1.12 documentation states:
>>
>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>>> consume records from a topic and periodically checkpoint all its Kafka
>>> offsets, together with the state of other operations. In case of a job
>>> failure, Flink will restore the streaming program to the state of the
>>> latest checkpoint and re-consume the records from Kafka, starting from 
>>> the
>>> offsets that were stored in the checkpoint.
>>> The interval of drawing checkpoints therefore defines how much the
>>> program may have to go back at most, in case of a failure. To use fault
>>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>>> enabled
>>> in the job.
>>> If checkpointing is disabled, the Kafka consumer will periodically
>>> commit the offsets to Zookeeper.
>>
>>
>> Thank you.
>>
>> Marco
>>
>>
>>
>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-12-01 Thread Marco Villalobos
Thank you. One last question.  What is an RP? Where can I read it?

Marco

> On Nov 30, 2021, at 11:06 PM, Hang Ruan  wrote:
> 
> Hi,
> 
> In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint is 
> open is the default behavior in KafkaSourceBuilder. And it can not be changed 
> in KafkaSourceBuilder. 
> 
> By this FLINK-24277 , we 
> could change the behavior. This problem will be fixed in 1.12.6. It seems not 
> to be contained in your version.  
> 
> Reading the RP will be helpful for you to understand the behavior.
>  
> 
> Marco Villalobos  > 于2021年12月1日周三 上午3:43写道:
> Thanks! 
> 
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT does 
> not exist in Flink 1.12.
> 
> Is that property supported with the string "commit.offsets.on.checkpoints"?
> 
> How do I configure that behavior so that offsets get committed on checkpoints 
> in Flink 1.12 when using the KafkaSourceBuilder? Or is that the default 
> behavior with checkpoints?
> 
> 
> 
> 
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  > wrote:
> Hi, 
> 
> Maybe you can write like this : 
> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), 
> "true");
> 
> Other additional properties could be found here : 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>  
> 
> Marco Villalobos  > 于2021年11月30日周二 上午11:08写道:
> Thank you for the information.  That still does not answer my question 
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so 
> that consumer should commit offsets back to Kafka on checkpoints?
> 
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method. 
> 
> But now that I am using KafkaSourceBuilder, how do I configure that behavior 
> so that offsets get committed on checkpoints?  Or is that the default 
> behavior with checkpoints?
> 
> -Marco
> 
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  > wrote:
> Hi!
> 
> Flink 1.14 release note states about this. See [1].
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>  
> 
> Marco Villalobos  > 于2021年11月30日周二 上午7:12写道:
> Hi everybody,
> 
> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer to 
> using the KafkaSourceBuilder.
> 
> FlinkKafkaConsumer has the method 
> 
> /**
>  * Specifies whether or not the consumer should commit offsets back to Kafka 
> on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for the 
> job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" 
> (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
> 
> How do I setup that parameter when using the KafkaSourceBuilder? If I already 
> have checkpointing configured, is it necessary to setup "commit offsets on 
> checkpoints"?
> 
> The Flink 1.12 documentation does not discuss this topic, and the Flink 1.14 
> documentation says little about it.
> 
>  For example, the Flink 1.14 documentation states:
> 
> Additional Properties
> In addition to properties described above, you can set arbitrary properties 
> for KafkaSource and KafkaConsumer by using setProperties(Properties) and 
> setProperty(String, String). KafkaSource has following options for 
> configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming offsets to 
> Kafka brokers on checkpoint
> 
> And the 1.12 documentation states:
> 
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume 
> records from a topic and periodically checkpoint all its Kafka offsets, 
> together with the state of other operations. In case of a job failure, Flink 
> will restore the streaming program to the state of the latest checkpoint and 
> re-consume the records from Kafka, starting from the offsets that were stored 
> in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the program 
> may have to go back at most, in case of a failure. To use fault tolerant 
> Kafka Consumers, checkpointing of the topology needs to be enabled in the job.
> If checkpointing is disabled, the Kafka consumer will periodically commit the 
> offsets to Zookeeper.
> 
> Thank you.
> 
> Marco
> 
> 



Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-30 Thread Hang Ruan
Hi,

In 1.12.0-1.12.5 versions, committing offset to kafka when the checkpoint
is open is the default behavior in KafkaSourceBuilder. And it can not be
changed in KafkaSourceBuilder.

By this FLINK-24277 , we
could change the behavior. This problem will be fixed in 1.12.6. It seems
not to be contained in your version.

Reading the RP will be helpful for you to understand the behavior.


Marco Villalobos  于2021年12月1日周三 上午3:43写道:

> Thanks!
>
> However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
> does not exist in Flink 1.12.
>
> Is that property supported with the string "commit.offsets.on.checkpoints"?
>
> How do I configure that behavior so that offsets get committed on
> checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
> default behavior with checkpoints?
>
>
>
>
> On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:
>
>> Hi,
>>
>> Maybe you can write like this :
>> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
>> "true");
>>
>> Other additional properties could be found here :
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>>
>> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>>
>>> Thank you for the information.  That still does not answer my question
>>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>>> that consumer should commit offsets back to Kafka on checkpoints?
>>>
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>>> method.
>>>
>>> But now that I am using KafkaSourceBuilder, how do I configure that
>>> behavior so that offsets get committed on checkpoints?  Or is that the
>>> default behavior with checkpoints?
>>>
>>> -Marco
>>>
>>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng 
>>> wrote:
>>>
 Hi!

 Flink 1.14 release note states about this. See [1].

 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer

 Marco Villalobos  于2021年11月30日周二 上午7:12写道:

> Hi everybody,
>
> I am using Flink 1.12 and migrating my code from using
> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>
> FlinkKafkaConsumer has the method
>
> /**
>>  * Specifies whether or not the consumer should commit offsets back
>> to Kafka on checkpoints.
>>  * This setting will only have effect if checkpointing is enabled for
>> the job. If checkpointing isn't
>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>> "enable.auto.commit" (for 0.9+) property
>>  * settings will be used.
>> */
>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>
>
> How do I setup that parameter when using the KafkaSourceBuilder? If I
> already have checkpointing configured, is it necessary to setup "commit
> offsets on checkpoints"?
>
> The Flink 1.12 documentation does not discuss this topic, and the
> Flink 1.14 documentation says little about it.
>
>  For example, the Flink 1.14 documentation states:
>
> Additional Properties
>> In addition to properties described above, you can set arbitrary
>> properties for KafkaSource and KafkaConsumer by using
>> setProperties(Properties) and setProperty(String, String). KafkaSource 
>> has
>> following options for configuration:
>> commit.offsets.on.checkpoint specifies whether to commit consuming
>> offsets to Kafka brokers on checkpoint
>
>
> And the 1.12 documentation states:
>
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
>> consume records from a topic and periodically checkpoint all its Kafka
>> offsets, together with the state of other operations. In case of a job
>> failure, Flink will restore the streaming program to the state of the
>> latest checkpoint and re-consume the records from Kafka, starting from 
>> the
>> offsets that were stored in the checkpoint.
>> The interval of drawing checkpoints therefore defines how much the
>> program may have to go back at most, in case of a failure. To use fault
>> tolerant Kafka Consumers, checkpointing of the topology needs to be 
>> enabled
>> in the job.
>> If checkpointing is disabled, the Kafka consumer will periodically
>> commit the offsets to Zookeeper.
>
>
> Thank you.
>
> Marco
>
>
>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-30 Thread Marco Villalobos
Thanks!

However, I noticed that KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT
does not exist in Flink 1.12.

Is that property supported with the string "commit.offsets.on.checkpoints"?

How do I configure that behavior so that offsets get committed on
checkpoints in Flink 1.12 when using the KafkaSourceBuilder? Or is that the
default behavior with checkpoints?




On Mon, Nov 29, 2021 at 7:46 PM Hang Ruan  wrote:

> Hi,
>
> Maybe you can write like this :
> builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
> "true");
>
> Other additional properties could be found here :
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties
>
> Marco Villalobos  于2021年11月30日周二 上午11:08写道:
>
>> Thank you for the information.  That still does not answer my question
>> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
>> that consumer should commit offsets back to Kafka on checkpoints?
>>
>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this
>> method.
>>
>> But now that I am using KafkaSourceBuilder, how do I configure that
>> behavior so that offsets get committed on checkpoints?  Or is that the
>> default behavior with checkpoints?
>>
>> -Marco
>>
>> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> Flink 1.14 release note states about this. See [1].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>>
>>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>>
 Hi everybody,

 I am using Flink 1.12 and migrating my code from using
 FlinkKafkaConsumer to using the KafkaSourceBuilder.

 FlinkKafkaConsumer has the method

 /**
>  * Specifies whether or not the consumer should commit offsets back to
> Kafka on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for
> the job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) /
> "enable.auto.commit" (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)


 How do I setup that parameter when using the KafkaSourceBuilder? If I
 already have checkpointing configured, is it necessary to setup "commit
 offsets on checkpoints"?

 The Flink 1.12 documentation does not discuss this topic, and the Flink
 1.14 documentation says little about it.

  For example, the Flink 1.14 documentation states:

 Additional Properties
> In addition to properties described above, you can set arbitrary
> properties for KafkaSource and KafkaConsumer by using
> setProperties(Properties) and setProperty(String, String). KafkaSource has
> following options for configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming
> offsets to Kafka brokers on checkpoint


 And the 1.12 documentation states:

 With Flink’s checkpointing enabled, the Flink Kafka Consumer will
> consume records from a topic and periodically checkpoint all its Kafka
> offsets, together with the state of other operations. In case of a job
> failure, Flink will restore the streaming program to the state of the
> latest checkpoint and re-consume the records from Kafka, starting from the
> offsets that were stored in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the
> program may have to go back at most, in case of a failure. To use fault
> tolerant Kafka Consumers, checkpointing of the topology needs to be 
> enabled
> in the job.
> If checkpointing is disabled, the Kafka consumer will periodically
> commit the offsets to Zookeeper.


 Thank you.

 Marco





Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Hang Ruan
Hi,

Maybe you can write like this :
builder.setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(),
"true");

Other additional properties could be found here :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#additional-properties

Marco Villalobos  于2021年11月30日周二 上午11:08写道:

> Thank you for the information.  That still does not answer my question
> though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
> that consumer should commit offsets back to Kafka on checkpoints?
>
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.
>
> But now that I am using KafkaSourceBuilder, how do I configure that
> behavior so that offsets get committed on checkpoints?  Or is that the
> default behavior with checkpoints?
>
> -Marco
>
> On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> Flink 1.14 release note states about this. See [1].
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>>
>> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>>
>>> Hi everybody,
>>>
>>> I am using Flink 1.12 and migrating my code from using
>>> FlinkKafkaConsumer to using the KafkaSourceBuilder.
>>>
>>> FlinkKafkaConsumer has the method
>>>
>>> /**
  * Specifies whether or not the consumer should commit offsets back to
 Kafka on checkpoints.
  * This setting will only have effect if checkpointing is enabled for
 the job. If checkpointing isn't
  * enabled, only the "auto.commit.enable" (for 0.8) /
 "enable.auto.commit" (for 0.9+) property
  * settings will be used.
 */
 FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>>
>>>
>>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>>> already have checkpointing configured, is it necessary to setup "commit
>>> offsets on checkpoints"?
>>>
>>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>>> 1.14 documentation says little about it.
>>>
>>>  For example, the Flink 1.14 documentation states:
>>>
>>> Additional Properties
 In addition to properties described above, you can set arbitrary
 properties for KafkaSource and KafkaConsumer by using
 setProperties(Properties) and setProperty(String, String). KafkaSource has
 following options for configuration:
 commit.offsets.on.checkpoint specifies whether to commit consuming
 offsets to Kafka brokers on checkpoint
>>>
>>>
>>> And the 1.12 documentation states:
>>>
>>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will
 consume records from a topic and periodically checkpoint all its Kafka
 offsets, together with the state of other operations. In case of a job
 failure, Flink will restore the streaming program to the state of the
 latest checkpoint and re-consume the records from Kafka, starting from the
 offsets that were stored in the checkpoint.
 The interval of drawing checkpoints therefore defines how much the
 program may have to go back at most, in case of a failure. To use fault
 tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
 in the job.
 If checkpointing is disabled, the Kafka consumer will periodically
 commit the offsets to Zookeeper.
>>>
>>>
>>> Thank you.
>>>
>>> Marco
>>>
>>>
>>>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Marco Villalobos
Thank you for the information.  That still does not answer my question
though.  How do I configure Flink in 1.12 using the KafkaSourceBuilder so
that consumer should commit offsets back to Kafka on checkpoints?

FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean) has this method.

But now that I am using KafkaSourceBuilder, how do I configure that
behavior so that offsets get committed on checkpoints?  Or is that the
default behavior with checkpoints?

-Marco

On Mon, Nov 29, 2021 at 5:58 PM Caizhi Weng  wrote:

> Hi!
>
> Flink 1.14 release note states about this. See [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>
> Marco Villalobos  于2021年11月30日周二 上午7:12写道:
>
>> Hi everybody,
>>
>> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
>> to using the KafkaSourceBuilder.
>>
>> FlinkKafkaConsumer has the method
>>
>> /**
>>>  * Specifies whether or not the consumer should commit offsets back to
>>> Kafka on checkpoints.
>>>  * This setting will only have effect if checkpointing is enabled for
>>> the job. If checkpointing isn't
>>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>>> "enable.auto.commit" (for 0.9+) property
>>>  * settings will be used.
>>> */
>>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>>
>>
>> How do I setup that parameter when using the KafkaSourceBuilder? If I
>> already have checkpointing configured, is it necessary to setup "commit
>> offsets on checkpoints"?
>>
>> The Flink 1.12 documentation does not discuss this topic, and the Flink
>> 1.14 documentation says little about it.
>>
>>  For example, the Flink 1.14 documentation states:
>>
>> Additional Properties
>>> In addition to properties described above, you can set arbitrary
>>> properties for KafkaSource and KafkaConsumer by using
>>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>>> following options for configuration:
>>> commit.offsets.on.checkpoint specifies whether to commit consuming
>>> offsets to Kafka brokers on checkpoint
>>
>>
>> And the 1.12 documentation states:
>>
>> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
>>> records from a topic and periodically checkpoint all its Kafka offsets,
>>> together with the state of other operations. In case of a job failure,
>>> Flink will restore the streaming program to the state of the latest
>>> checkpoint and re-consume the records from Kafka, starting from the offsets
>>> that were stored in the checkpoint.
>>> The interval of drawing checkpoints therefore defines how much the
>>> program may have to go back at most, in case of a failure. To use fault
>>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>>> in the job.
>>> If checkpointing is disabled, the Kafka consumer will periodically
>>> commit the offsets to Zookeeper.
>>
>>
>> Thank you.
>>
>> Marco
>>
>>
>>


Re: How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Caizhi Weng
Hi!

Flink 1.14 release note states about this. See [1].

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer

Marco Villalobos  于2021年11月30日周二 上午7:12写道:

> Hi everybody,
>
> I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
> to using the KafkaSourceBuilder.
>
> FlinkKafkaConsumer has the method
>
> /**
>>  * Specifies whether or not the consumer should commit offsets back to
>> Kafka on checkpoints.
>>  * This setting will only have effect if checkpointing is enabled for the
>> job. If checkpointing isn't
>>  * enabled, only the "auto.commit.enable" (for 0.8) /
>> "enable.auto.commit" (for 0.9+) property
>>  * settings will be used.
>> */
>> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)
>
>
> How do I setup that parameter when using the KafkaSourceBuilder? If I
> already have checkpointing configured, is it necessary to setup "commit
> offsets on checkpoints"?
>
> The Flink 1.12 documentation does not discuss this topic, and the Flink
> 1.14 documentation says little about it.
>
>  For example, the Flink 1.14 documentation states:
>
> Additional Properties
>> In addition to properties described above, you can set arbitrary
>> properties for KafkaSource and KafkaConsumer by using
>> setProperties(Properties) and setProperty(String, String). KafkaSource has
>> following options for configuration:
>> commit.offsets.on.checkpoint specifies whether to commit consuming
>> offsets to Kafka brokers on checkpoint
>
>
> And the 1.12 documentation states:
>
> With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
>> records from a topic and periodically checkpoint all its Kafka offsets,
>> together with the state of other operations. In case of a job failure,
>> Flink will restore the streaming program to the state of the latest
>> checkpoint and re-consume the records from Kafka, starting from the offsets
>> that were stored in the checkpoint.
>> The interval of drawing checkpoints therefore defines how much the
>> program may have to go back at most, in case of a failure. To use fault
>> tolerant Kafka Consumers, checkpointing of the topology needs to be enabled
>> in the job.
>> If checkpointing is disabled, the Kafka consumer will periodically commit
>> the offsets to Zookeeper.
>
>
> Thank you.
>
> Marco
>
>
>


How do you configure setCommitOffsetsOnCheckpoints in Flink 1.12 when using KafkaSourceBuilder?

2021-11-29 Thread Marco Villalobos
Hi everybody,

I am using Flink 1.12 and migrating my code from using FlinkKafkaConsumer
to using the KafkaSourceBuilder.

FlinkKafkaConsumer has the method

/**
>  * Specifies whether or not the consumer should commit offsets back to
> Kafka on checkpoints.
>  * This setting will only have effect if checkpointing is enabled for the
> job. If checkpointing isn't
>  * enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit"
> (for 0.9+) property
>  * settings will be used.
> */
> FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints(boolean)


How do I setup that parameter when using the KafkaSourceBuilder? If I
already have checkpointing configured, is it necessary to setup "commit
offsets on checkpoints"?

The Flink 1.12 documentation does not discuss this topic, and the Flink
1.14 documentation says little about it.

 For example, the Flink 1.14 documentation states:

Additional Properties
> In addition to properties described above, you can set arbitrary
> properties for KafkaSource and KafkaConsumer by using
> setProperties(Properties) and setProperty(String, String). KafkaSource has
> following options for configuration:
> commit.offsets.on.checkpoint specifies whether to commit consuming offsets
> to Kafka brokers on checkpoint


And the 1.12 documentation states:

With Flink’s checkpointing enabled, the Flink Kafka Consumer will consume
> records from a topic and periodically checkpoint all its Kafka offsets,
> together with the state of other operations. In case of a job failure,
> Flink will restore the streaming program to the state of the latest
> checkpoint and re-consume the records from Kafka, starting from the offsets
> that were stored in the checkpoint.
> The interval of drawing checkpoints therefore defines how much the program
> may have to go back at most, in case of a failure. To use fault tolerant
> Kafka Consumers, checkpointing of the topology needs to be enabled in the
> job.
> If checkpointing is disabled, the Kafka consumer will periodically commit
> the offsets to Zookeeper.


Thank you.

Marco