This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 80d99ea2bad KAFKA-18991: FetcherThread should match leader epochs
between fetch request and fetch state (#19223)
80d99ea2bad is described below
commit 80d99ea2badc3decaa4f524f87914de759c72518
Author: TengYao Chi <[email protected]>
AuthorDate: Wed Mar 26 00:14:01 2025 +0800
KAFKA-18991: FetcherThread should match leader epochs between fetch request
and fetch state (#19223)
This PR fixes a potential issue where the `FetchResponse` returns
`divergingEndOffsets` with an older leader epoch. This can lead to
committed records being removed from the follower's log, potentially
causing data loss.
In detail:
`processFetchRequest` gets the requested leader epoch of partition data
by `topicPartition` and compares it with the leader epoch of the current
fetch state. If they don't match, the response is ignored.
Reviewers: Jun Rao <[email protected]>
---
.../scala/kafka/server/AbstractFetcherThread.scala | 12 ++++++----
.../kafka/server/AbstractFetcherThreadTest.scala | 26 +++++++++++++++++++++-
2 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 25faf62a2cd..50436dda1cd 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -307,7 +307,8 @@ abstract class AbstractFetcherThread(name: String,
}
}
- private def processFetchRequest(sessionPartitions: util.Map[TopicPartition,
FetchRequest.PartitionData],
+ // visible for testing
+ private[server] def processFetchRequest(sessionPartitions:
util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
val divergingEndOffsets = mutable.Map.empty[TopicPartition, EpochEndOffset]
@@ -333,11 +334,14 @@ abstract class AbstractFetcherThread(name: String,
responseData.foreachEntry { (topicPartition, partitionData) =>
Option(partitionStates.stateValue(topicPartition)).foreach {
currentFetchState =>
// It's possible that a partition is removed and re-added or
truncated when there is a pending fetch request.
- // In this case, we only want to process the fetch response if the
partition state is ready for fetch and
- // the current offset is the same as the offset requested.
+ // In this case, we only want to process the fetch response if:
+ // - the partition state is ready for fetch
+ // - the current offset is the same as the offset requested
+ // - the current leader epoch is the same as the leader epoch
requested
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null &&
fetchPartitionData.fetchOffset ==
currentFetchState.fetchOffset &&
+ fetchPartitionData.currentLeaderEpoch.map[Boolean](_ ==
currentFetchState.currentLeaderEpoch).orElse(true) &&
currentFetchState.isReadyForFetch) {
Errors.forCode(partitionData.errorCode) match {
case Errors.NONE =>
@@ -362,7 +366,7 @@ abstract class AbstractFetcherThread(name: String,
val logAppendInfoOpt = processPartitionData(
topicPartition,
currentFetchState.fetchOffset,
-
fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch),
+ currentFetchState.currentLeaderEpoch,
partitionData
)
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index d1abc97f8cd..ef06ffcc10a 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -1153,4 +1153,28 @@ class AbstractFetcherThreadTest {
assertTrue(fetcher.fetchState(unknownPartition).isEmpty)
}
-}
+ @Test
+ def testIgnoreFetchResponseWhenLeaderEpochChanged(): Unit = {
+ val newEpoch = 1
+ val initEpoch = 0
+
+ val partition = new TopicPartition("topic", 0)
+ val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+ val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+ val fetcher = new MockFetcherThread(mockLeaderEndpoint,
mockTierStateMachine)
+ val replicaState = PartitionState(leaderEpoch = newEpoch)
+ fetcher.setReplicaState(partition, replicaState)
+ val initFetchState = initialFetchState(topicIds.get(partition.topic), 0L,
leaderEpoch = newEpoch)
+ fetcher.addPartitions(Map(partition -> initFetchState))
+
+ val batch = mkBatch(baseOffset = 0L, leaderEpoch = initEpoch, new
SimpleRecord("a".getBytes))
+ val leaderState = PartitionState(Seq(batch), leaderEpoch = initEpoch,
highWatermark = 1L)
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
+ val partitionData = Map(partition -> new
FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0, 1048576,
Optional.of(initEpoch), Optional.of(initEpoch))).asJava
+ val fetchRequestOpt = FetchRequest.Builder.forReplica(0, 0, initEpoch, 0,
Int.MaxValue, partitionData)
+
+ fetcher.processFetchRequest(partitionData, fetchRequestOpt)
+ assertEquals(0, replicaState.logEndOffset, "FetchResponse should be
ignored when leader epoch does not match")
+ }
+}
\ No newline at end of file