junrao commented on code in PR #18852:
URL: https://github.com/apache/kafka/pull/18852#discussion_r1987826859


##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -333,7 +336,9 @@ abstract class AbstractFetcherThread(name: String,
             // In this case, we only want to process the fetch response if the 
partition state is ready for fetch and
             // the current offset is the same as the offset requested.
             val fetchPartitionData = sessionPartitions.get(topicPartition)
-            if (fetchPartitionData != null && fetchPartitionData.fetchOffset 
== currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
+            if (fetchPartitionData != null &&

Review Comment:
   @jsancio : With the approach in this PR, we still have a gap. It's possible 
that a fetch response (for an old leader epoch) returns divergingEndOffsets. In 
this case, we will take the divergingEndOffsets in the fetch response and 
truncates the local log based on that. This could cause a committed record to 
be removed from the follow's log, potentially leading to data loss.
   
   We can fix this issue by ignoring the fetch response if the leader epoch in 
the fetch request doesn't match the leader epoch in the current fetch state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to