[ 
https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17727581#comment-17727581
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-32038 at 5/30/23 3:26 PM:
----------------------------------------------------------------------

Hi [~pritam.agarwala], it does look like that the behavior was actually altered 
/ broken when the Kafka source connector was reimplemented on top of the new 
Source V2 interface.

i.e. the desired behavior that you are describing is exactly how things worked 
in the old {{{}FlinkKafkaConsumer{}}}. Things changed such that offset 
committing is only ever done when checkpoint is enabled in the new 
{{{}KafkaSource{}}}.

Since your ticket description doesn't explicitly mention: could you clarify 
which version of the Kafka source connector are you using? 
{{FlinkKafkaConsumer}} or {{{}KafkaSource{}}}?

If it's the latter, I do think there's a case to fix this, as its current 
behavior conflicts with what the document describes. Since it seems to be a 
"bug", I wouldn't categorize this as "changing default behavior". Moreover, not 
having this behavior added back to {{KafkaSource}} would arguably break 
existing tooling / integrations if users want to migrate from 
{{FlinkKafkaConsumer}} to {{{}KafkaSource{}}}. cc [~martijnvisser] 


was (Author: tzulitai):
Hi [~pritam.agarwala], it does look like that the behavior was actually altered 
/ broken when the Kafka source connector was reimplemented on top of the new 
Source V2 interface.

i.e. the desired behavior that you are describing is exactly how things worked 
in the old {{{}FlinkKafkaConsumer{}}}. Things changed such that offset 
committing is only ever done when checkpoint is enabled in the new 
{{{}KafkaSource{}}}.

Since your ticket description doesn't explicitly mention: could you clarify 
which version of the Kafka source connector are you using? 
{{FlinkKafkaConsumer}} or {{{}KafkaSource{}}}?

If it's the latter, I do think there's a case to fix this, as its current 
behavior conflicts with what the document describes. I wouldn't categorize this 
as "changing default behavior".

> OffsetCommitMode.Kafka_periodic with checkpointing enabled 
> -----------------------------------------------------------
>
>                 Key: FLINK-32038
>                 URL: https://issues.apache.org/jira/browse/FLINK-32038
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Runtime / Checkpointing
>    Affects Versions: 1.14.6
>            Reporter: Pritam Agarwala
>            Priority: Major
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka 
> committed offset. Flink is updating the offsets only after checkpointing to 
> make it consistent.
> Default Behaviour as per doc :
> If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set 
> to false, then offset will not be committed at all even if the 
> {{enable.auto.commit}} is set to true.
> So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't 
> it fall back on the {{enable.auto.commit}} to do offset commit regularly 
> since* *in any case flink doesn't use consumer committed offsets for 
> recovery.*
>  
> OffsetCommitModes class :
>   
> {code:java}
> public class OffsetCommitModes {
>     /**
>      * Determine the offset commit mode using several configuration values.
>      *
>      * @param enableAutoCommit whether or not auto committing is enabled in 
> the provided Kafka
>      *     properties.
>      * @param enableCommitOnCheckpoint whether or not committing on 
> checkpoints is enabled.
>      * @param enableCheckpointing whether or not checkpoint is enabled for 
> the consumer.
>      * @return the offset commit mode to use, based on the configuration 
> values.
>      */
>     public static OffsetCommitMode fromConfiguration(
>             boolean enableAutoCommit,
>             boolean enableCommitOnCheckpoint,
>             boolean enableCheckpointing) {
>         if (enableCheckpointing) {
>             // if checkpointing is enabled, the mode depends only on whether  
>  committing on
>             // checkpoints is enabled
>             return (enableCommitOnCheckpoint)
>                     ? OffsetCommitMode.ON_CHECKPOINTS
>                     : OffsetCommitMode.DISABLED;
>         } else {
>             // else, the mode depends only on whether auto committing is 
> enabled in the provided
>             // Kafka properties
>             return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
> OffsetCommitMode.DISABLED;
>         }
>     }
> }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to