[ 
https://issues.apache.org/jira/browse/FLINK-32038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773719#comment-17773719
 ] 

Martijn Visser commented on FLINK-32038:
----------------------------------------

[~pritam.agarwala] [~tzulitai] What is the conclusion of this ticket?

> OffsetCommitMode.Kafka_periodic with checkpointing enabled 
> -----------------------------------------------------------
>
>                 Key: FLINK-32038
>                 URL: https://issues.apache.org/jira/browse/FLINK-32038
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Runtime / Checkpointing
>    Affects Versions: 1.14.6
>            Reporter: Pritam Agarwala
>            Priority: Major
>
> I need to get kafka-lag to prepare a graph and its dependent on kafka 
> committed offset. Flink is updating the offsets only after checkpointing to 
> make it consistent.
> Default Behaviour as per doc :
> If checkpoint is enabled, but {{consumer.setCommitOffsetsOnCheckpoints}} set 
> to false, then offset will not be committed at all even if the 
> {{enable.auto.commit}} is set to true.
> So, when {{consumer.setCommitOffsetsOnCheckpoints}} set to false, *shouldn't 
> it fall back on the {{enable.auto.commit}} to do offset commit regularly 
> since* *in any case flink doesn't use consumer committed offsets for 
> recovery.*
>  
> OffsetCommitModes class :
>   
> {code:java}
> public class OffsetCommitModes {
>     /**
>      * Determine the offset commit mode using several configuration values.
>      *
>      * @param enableAutoCommit whether or not auto committing is enabled in 
> the provided Kafka
>      *     properties.
>      * @param enableCommitOnCheckpoint whether or not committing on 
> checkpoints is enabled.
>      * @param enableCheckpointing whether or not checkpoint is enabled for 
> the consumer.
>      * @return the offset commit mode to use, based on the configuration 
> values.
>      */
>     public static OffsetCommitMode fromConfiguration(
>             boolean enableAutoCommit,
>             boolean enableCommitOnCheckpoint,
>             boolean enableCheckpointing) {
>         if (enableCheckpointing) {
>             // if checkpointing is enabled, the mode depends only on whether  
>  committing on
>             // checkpoints is enabled
>             return (enableCommitOnCheckpoint)
>                     ? OffsetCommitMode.ON_CHECKPOINTS
>                     : OffsetCommitMode.DISABLED;
>         } else {
>             // else, the mode depends only on whether auto committing is 
> enabled in the provided
>             // Kafka properties
>             return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : 
> OffsetCommitMode.DISABLED;
>         }
>     }
> }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to