[ https://issues.apache.org/jira/browse/FLINK-35506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
elon_X closed FLINK-35506. -------------------------- Resolution: Not A Problem > disable kafka auto-commit and rely on flink’s checkpointing if both are > enabled > ------------------------------------------------------------------------------- > > Key: FLINK-35506 > URL: https://issues.apache.org/jira/browse/FLINK-35506 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.16.1 > Reporter: elon_X > Priority: Major > Attachments: image-2024-06-03-23-39-28-270.png > > > When I use KafkaSource for consuming topics and set the Kafka parameter > {{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the > task, I notice that both will commit offsets. Should Kafka's auto-commit be > disabled when enabling Flink checkpointing, similar to how it's done with > FlinkKafkaConsumer? > > *How to reproduce* > > {code:java} > // code placeholder > Properties kafkaParams = new Properties(); > kafkaParams.put("enable.auto.commit", "true"); > kafkaParams.put("auto.offset.reset", "latest"); > kafkaParams.put("fetch.min.bytes", "4096"); > kafkaParams.put("sasl.mechanism", "PLAIN"); > kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); > kafkaParams.put("bootstrap.servers", bootStrap); > kafkaParams.put("group.id", expoGroupId); > kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"" + username + "\" password=\"" + password + "\";"); > KafkaSource<String> source = KafkaSource > .<String>builder() > .setBootstrapServers(bootStrap) > .setProperties(kafkaParams) > .setGroupId(expoGroupId) > .setTopics(Arrays.asList(expoTopic)) > .setValueOnlyDeserializer(new SimpleStringSchema()) > .setStartingOffsets(OffsetsInitializer.latest()) > .build(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source") > .filter(r -> true); > env.enableCheckpointing(3000 * 1000); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(1000 * 300); > env.execute("kafka-consumer"); {code} > > > the kafka client's > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously > committing offsets. > !image-2024-06-03-23-39-28-270.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)