Darcy Lin created FLINK-33484:
---------------------------------

             Summary: Flink Kafka Connector Offset Lag Issue with Transactional 
Data and Read Committed Isolation Level
                 Key: FLINK-33484
                 URL: https://issues.apache.org/jira/browse/FLINK-33484
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.17.1
         Environment: Flink 1.17.1

kafka 2.5.1
            Reporter: Darcy Lin


We have encountered an issue with the Flink Kafka connector when consuming 
transactional data from Kafka with the {{isolation.level}} set to 
{{read_committed}} ({{{}setProperty("isolation.level", "read_committed"){}}}). 
The problem is that even when all the data from a topic is consumed, the offset 
lag is not 0, but 1. However, when using the Kafka Java client to consume the 
same data, this issue does not occur.

We suspect that this issue arises due to the way Flink Kafka connector 
calculates the offset. The problem seems to be in the 
{{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
When saving the offset, the method calls 
{{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
statement works correctly in a regular Kafka scenario, it might not be accurate 
when the {{read_committed}} mode is used. We believe that it should be 
{{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
transactional data in Kafka occupies an additional offset to store the 
transaction marker.

We request the Flink team to investigate this issue and provide us with 
guidance on how to resolve it.

Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to