[ 
https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

elon_X closed FLINK-35506.
--------------------------
    Resolution: Not A Problem

> disable kafka auto-commit and rely on flink’s checkpointing if both are 
> enabled
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-35506
>                 URL: https://issues.apache.org/jira/browse/FLINK-35506
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.1
>            Reporter: elon_X
>            Priority: Major
>         Attachments: image-2024-06-03-23-39-28-270.png
>
>
> When I use KafkaSource for consuming topics and set the Kafka parameter 
> {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
> task, I notice that both will commit offsets. Should Kafka's auto-commit be 
> disabled when enabling Flink checkpointing, similar to how it's done with 
> FlinkKafkaConsumer?
>  
> *How to reproduce*
>  
> {code:java}
> // code placeholder
> Properties kafkaParams = new Properties();
> kafkaParams.put("enable.auto.commit", "true");
> kafkaParams.put("auto.offset.reset", "latest");
> kafkaParams.put("fetch.min.bytes", "4096");
> kafkaParams.put("sasl.mechanism", "PLAIN");
> kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
> kafkaParams.put("bootstrap.servers", bootStrap);
> kafkaParams.put("group.id", expoGroupId);
> kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule
>  required username=\"" + username + "\" password=\"" + password + "\";");
> KafkaSource<String> source = KafkaSource
>         .<String>builder()
>         .setBootstrapServers(bootStrap)
>         .setProperties(kafkaParams)
>         .setGroupId(expoGroupId)
>         .setTopics(Arrays.asList(expoTopic))
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .setStartingOffsets(OffsetsInitializer.latest())
>         .build();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
>         .filter(r ->  true);
> env.enableCheckpointing(3000 * 1000);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
> env.execute("kafka-consumer"); {code}
>  
>  
> the kafka client's 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously 
> committing offsets.
> !image-2024-06-03-23-39-28-270.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to