[ https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784526#comment-17784526 ]
Darcy Lin commented on FLINK-33484: ----------------------------------- [~martijnvisser] org.apache.flink:flink-connector-kafka:1.17.1 > Flink Kafka Connector Offset Lag Issue with Transactional Data and Read > Committed Isolation Level > ------------------------------------------------------------------------------------------------- > > Key: FLINK-33484 > URL: https://issues.apache.org/jira/browse/FLINK-33484 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.17.1 > Environment: Flink 1.17.1 > kafka 2.5.1 > Reporter: Darcy Lin > Priority: Major > > We have encountered an issue with the Flink Kafka connector when consuming > transactional data from Kafka with the {{isolation.level}} set to > {{read_committed}} ({{{}setProperty("isolation.level", > "read_committed"){}}}). The problem is that even when all the data from a > topic is consumed, the offset lag is not 0, but 1. However, when using the > Kafka Java client to consume the same data, this issue does not occur. > We suspect that this issue arises due to the way Flink Kafka connector > calculates the offset. The problem seems to be in the > {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. > When saving the offset, the method calls > {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this > statement works correctly in a regular Kafka scenario, it might not be > accurate when the {{read_committed}} mode is used. We believe that it should > be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as > transactional data in Kafka occupies an additional offset to store the > transaction marker. > We request the Flink team to investigate this issue and provide us with > guidance on how to resolve it. > Thank you for your attention and support. -- This message was sent by Atlassian Jira (v8.20.10#820010)