This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dedfed06f7a KAFKA-15510: Fix follower's lastFetchedEpoch when fetch
response has … (#14457)
dedfed06f7a is described below
commit dedfed06f7a472424080456c997f5200c6bef196
Author: chern <[email protected]>
AuthorDate: Thu Sep 28 06:14:42 2023 -0700
KAFKA-15510: Fix follower's lastFetchedEpoch when fetch response has …
(#14457)
When a fetch response has no record for a partition, validBytes is 0. We
shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala
since there is no record and it is Optional.empty. We should use
currentFetchState.lastFetchedEpoch instead.
Reviewers: Divij Vaidya <[email protected]>, Viktor Somogyi-Vass
<[email protected]>, Kamal
Chandraprakash<[email protected]>, Rajini Sivaram
<[email protected]>
---
core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 5 +++--
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala | 4 +++-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1dafb89ef0a..450fcfea461 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -364,10 +364,11 @@ abstract class AbstractFetcherThread(name: String,
// ReplicaDirAlterThread may have removed
topicPartition from the partitionStates after processing the partition data
if ((validBytes > 0 || currentFetchState.lag.isEmpty)
&& partitionStates.contains(topicPartition)) {
+ val lastFetchedEpoch =
+ if (logAppendInfo.lastLeaderEpoch.isPresent)
logAppendInfo.lastLeaderEpoch.asScala else currentFetchState.lastFetchedEpoch
// Update partitionStates only if there is no
exception during processPartitionData
val newFetchState =
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
- currentFetchState.currentLeaderEpoch, state =
Fetching,
- logAppendInfo.lastLeaderEpoch.asScala)
+ currentFetchState.currentLeaderEpoch, state =
Fetching, lastFetchedEpoch)
partitionStates.updateAndMoveToEnd(topicPartition,
newFetchState)
if (validBytes > 0)
fetcherStats.byteRate.mark(validBytes)
}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index e58532622e3..6a0feaa6456 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -749,9 +749,10 @@ class ReplicaFetcherThreadTest {
val log: UnifiedLog = mock(classOf[UnifiedLog])
val partition: Partition = mock(classOf[Partition])
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ val lastFetchedEpoch = 2
when(log.highWatermark).thenReturn(0)
- when(log.latestEpoch).thenReturn(Some(0))
+ when(log.latestEpoch).thenReturn(Some(lastFetchedEpoch))
when(log.endOffsetForEpoch(0)).thenReturn(Some(new OffsetAndEpoch(0, 0)))
when(log.logEndOffset).thenReturn(0)
when(log.maybeUpdateHighWatermark(0)).thenReturn(None)
@@ -835,6 +836,7 @@ class ReplicaFetcherThreadTest {
// Lag is set to Some(0).
assertEquals(Some(0), thread.fetchState(t1p0).flatMap(_.lag))
+ assertEquals(Some(lastFetchedEpoch),
thread.fetchState(t1p0).flatMap(_.lastFetchedEpoch))
}
@Test