This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new c69d5f3 KAFKA-13141; Skip follower fetch offset update in leader if
diverging epoch is present (#11136)
c69d5f3 is described below
commit c69d5f30f9ecac7e0074b21d9170de4837be6067
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Jul 28 17:27:26 2021 +0100
KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch
is present (#11136)
Reviewers: Jason Gustafson <[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 7 ++++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 22 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1b819d3..7d40110 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1709,7 +1709,8 @@ class ReplicaManager(val config: KafkaConfig,
* records in fetch response. Log start/end offset and high watermark may
change not only due to
* this fetch request, e.g., rolling new log segment and removing old log
segment may move log
* start offset further than the last offset in the fetched records. The
followers will get the
- * updated leader's state in the next fetch response.
+ * updated leader's state in the next fetch response. If follower has a
diverging epoch or if read
+ * fails with any error, follower fetch state is not updated.
*/
private def updateFollowerFetchState(followerId: Int,
readResults: Seq[(TopicPartition,
LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1718,6 +1719,10 @@ class ReplicaManager(val config: KafkaConfig,
debug(s"Skipping update of fetch state for follower $followerId since
the " +
s"log read returned error ${readResult.error}")
readResult
+ } else if (readResult.divergingEpoch.nonEmpty) {
+ debug(s"Skipping update of fetch state for follower $followerId since
the " +
+ s"log read returned diverging epoch ${readResult.divergingEpoch}")
+ readResult
} else {
nonOfflinePartition(topicPartition) match {
case Some(partition) =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6a0f94c..127aff5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -671,6 +671,28 @@ class ReplicaManagerTest {
assertEquals(0L, followerReplica.logStartOffset)
assertEquals(0L, followerReplica.logEndOffset)
+ // Next we receive an invalid request with a higher fetch offset, but a
diverging epoch.
+ // We expect that the replica state does not get updated.
+ val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L,
maxFetchBytes,
+ Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1))
+
+ replicaManager.fetchMessages(
+ timeout = 0L,
+ replicaId = 1,
+ fetchMinBytes = 1,
+ fetchMaxBytes = maxFetchBytes,
+ hardMaxBytesLimit = false,
+ fetchInfos = Seq(tp -> divergingFetchPartitionData),
+ quota = UnboundedQuota,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+ responseCallback = callback,
+ clientMetadata = None
+ )
+
+ assertTrue(successfulFetch.isDefined)
+ assertEquals(0L, followerReplica.logStartOffset)
+ assertEquals(0L, followerReplica.logEndOffset)
+
} finally {
replicaManager.shutdown(checkpointHW = false)
}