This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 047608f KAFKA-13141; Skip follower fetch offset update in leader if
diverging epoch is present (#11136)
047608f is described below
commit 047608fe5e6631139d890ce1ca045052daa0e43c
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Jul 28 17:27:26 2021 +0100
KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch
is present (#11136)
Reviewers: Jason Gustafson <[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 7 ++++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 22 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6837d81..f03571c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1750,7 +1750,8 @@ class ReplicaManager(val config: KafkaConfig,
* records in fetch response. Log start/end offset and high watermark may
change not only due to
* this fetch request, e.g., rolling new log segment and removing old log
segment may move log
* start offset further than the last offset in the fetched records. The
followers will get the
- * updated leader's state in the next fetch response.
+ * updated leader's state in the next fetch response. If follower has a
diverging epoch or if read
+ * fails with any error, follower fetch state is not updated.
*/
private def updateFollowerFetchState(followerId: Int,
readResults: Seq[(TopicPartition,
LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1759,6 +1760,10 @@ class ReplicaManager(val config: KafkaConfig,
debug(s"Skipping update of fetch state for follower $followerId since
the " +
s"log read returned error ${readResult.error}")
readResult
+ } else if (readResult.divergingEpoch.nonEmpty) {
+ debug(s"Skipping update of fetch state for follower $followerId since
the " +
+ s"log read returned diverging epoch ${readResult.divergingEpoch}")
+ readResult
} else {
onlinePartition(topicPartition) match {
case Some(partition) =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5e2563e..fd6ba75 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -698,6 +698,28 @@ class ReplicaManagerTest {
assertEquals(0L, followerReplica.logStartOffset)
assertEquals(0L, followerReplica.logEndOffset)
+ // Next we receive an invalid request with a higher fetch offset, but a
diverging epoch.
+ // We expect that the replica state does not get updated.
+ val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L,
maxFetchBytes,
+ Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1))
+
+ replicaManager.fetchMessages(
+ timeout = 0L,
+ replicaId = 1,
+ fetchMinBytes = 1,
+ fetchMaxBytes = maxFetchBytes,
+ hardMaxBytesLimit = false,
+ fetchInfos = Seq(tp -> divergingFetchPartitionData),
+ quota = UnboundedQuota,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+ responseCallback = callback,
+ clientMetadata = None
+ )
+
+ assertTrue(successfulFetch.isDefined)
+ assertEquals(0L, followerReplica.logStartOffset)
+ assertEquals(0L, followerReplica.logEndOffset)
+
} finally {
replicaManager.shutdown(checkpointHW = false)
}