Darcy Lin created FLINK-33484: --------------------------------- Summary: Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level Key: FLINK-33484 URL: https://issues.apache.org/jira/browse/FLINK-33484 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.17.1 Environment: Flink 1.17.1
kafka 2.5.1 Reporter: Darcy Lin We have encountered an issue with the Flink Kafka connector when consuming transactional data from Kafka with the {{isolation.level}} set to {{read_committed}} ({{{}setProperty("isolation.level", "read_committed"){}}}). The problem is that even when all the data from a topic is consumed, the offset lag is not 0, but 1. However, when using the Kafka Java client to consume the same data, this issue does not occur. We suspect that this issue arises due to the way Flink Kafka connector calculates the offset. The problem seems to be in the {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. When saving the offset, the method calls {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this statement works correctly in a regular Kafka scenario, it might not be accurate when the {{read_committed}} mode is used. We believe that it should be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as transactional data in Kafka occupies an additional offset to store the transaction marker. We request the Flink team to investigate this issue and provide us with guidance on how to resolve it. Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)