[ 
https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X updated FLINK-35506:
---------------------------
    Description: 
When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?

 

*How to reproduce*

 
{code:java}
// code placeholder
Properties kafkaParams = new Properties();
kafkaParams.put("enable.auto.commit", "true");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("fetch.min.bytes", "4096");
kafkaParams.put("sasl.mechanism", "PLAIN");
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("bootstrap.servers", bootStrap);
kafkaParams.put("group.id", expoGroupId);
kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
 required username=\"" + username + "\" password=\"" + password + "\";");

KafkaSource<String> source = KafkaSource
        .<String>builder()
        .setBootstrapServers(bootStrap)
        .setProperties(kafkaParams)
        .setGroupId(expoGroupId)
        .setTopics(Arrays.asList(expoTopic))
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .setStartingOffsets(OffsetsInitializer.latest())
        .build();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
        .filter(r ->  true);

env.enableCheckpointing(3000 * 1000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
env.execute("kafka-consumer"); {code}
 

 

'org.apache.kafka.clients.consumer.internals.ConsumerCoordinator' continuously 
committing offsets.

!image-2024-06-03-23-39-28-270.png!

  was:When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?


> disable kafka auto-commit and rely on flink’s checkpointing if both are 
> enabled
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-35506
>                 URL: https://issues.apache.org/jira/browse/FLINK-35506
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.1
>            Reporter: elon_X
>            Priority: Major
>         Attachments: image-2024-06-03-23-39-28-270.png
>
>
> When I use KafkaSource for consuming topics and set the Kafka parameter 
> {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
> task, I notice that both will commit offsets. Should Kafka's auto-commit be 
> disabled when enabling Flink checkpointing, similar to how it's done with 
> FlinkKafkaConsumer?
>  
> *How to reproduce*
>  
> {code:java}
> // code placeholder
> Properties kafkaParams = new Properties();
> kafkaParams.put("enable.auto.commit", "true");
> kafkaParams.put("auto.offset.reset", "latest");
> kafkaParams.put("fetch.min.bytes", "4096");
> kafkaParams.put("sasl.mechanism", "PLAIN");
> kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
> kafkaParams.put("bootstrap.servers", bootStrap);
> kafkaParams.put("group.id", expoGroupId);
> kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
>  required username=\"" + username + "\" password=\"" + password + "\";");
> KafkaSource<String> source = KafkaSource
>         .<String>builder()
>         .setBootstrapServers(bootStrap)
>         .setProperties(kafkaParams)
>         .setGroupId(expoGroupId)
>         .setTopics(Arrays.asList(expoTopic))
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .setStartingOffsets(OffsetsInitializer.latest())
>         .build();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
>         .filter(r ->  true);
> env.enableCheckpointing(3000 * 1000);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
> env.execute("kafka-consumer"); {code}
>  
>  
> 'org.apache.kafka.clients.consumer.internals.ConsumerCoordinator' 
> continuously committing offsets.
> !image-2024-06-03-23-39-28-270.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to