[ 
https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784526#comment-17784526
 ] 

Darcy Lin commented on FLINK-33484:
-----------------------------------

[~martijnvisser] org.apache.flink:flink-connector-kafka:1.17.1

> Flink Kafka Connector Offset Lag Issue with Transactional Data and Read 
> Committed Isolation Level
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33484
>                 URL: https://issues.apache.org/jira/browse/FLINK-33484
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.17.1
>         Environment: Flink 1.17.1
> kafka 2.5.1
>            Reporter: Darcy Lin
>            Priority: Major
>
> We have encountered an issue with the Flink Kafka connector when consuming 
> transactional data from Kafka with the {{isolation.level}} set to 
> {{read_committed}} ({{{}setProperty("isolation.level", 
> "read_committed"){}}}). The problem is that even when all the data from a 
> topic is consumed, the offset lag is not 0, but 1. However, when using the 
> Kafka Java client to consume the same data, this issue does not occur.
> We suspect that this issue arises due to the way Flink Kafka connector 
> calculates the offset. The problem seems to be in the 
> {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
> When saving the offset, the method calls 
> {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
> statement works correctly in a regular Kafka scenario, it might not be 
> accurate when the {{read_committed}} mode is used. We believe that it should 
> be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
> transactional data in Kafka occupies an additional offset to store the 
> transaction marker.
> We request the Flink team to investigate this issue and provide us with 
> guidance on how to resolve it.
> Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to