This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new aee9edc KAFKA-12751: Reset AlterIsr in-flight state for duplicate
update requests (#10633)
aee9edc is described below
commit aee9edce6a39138d3a5ce87d4511040ccd40fa8f
Author: Rajini Sivaram <[email protected]>
AuthorDate: Mon May 17 19:31:39 2021 +0100
KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests
(#10633)
Reviewers: David Arthur <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 4e06198..586afbf 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1442,10 +1442,15 @@ class Partition(val topicPartition: TopicPartition,
if (leaderAndIsr.leaderEpoch != leaderEpoch) {
debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we
have a stale leader epoch $leaderEpoch.")
isrChangeListener.markFailed()
- } else if (leaderAndIsr.zkVersion <= zkVersion) {
+ } else if (leaderAndIsr.zkVersion < zkVersion) {
debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we
have a newer version $zkVersion.")
isrChangeListener.markFailed()
} else {
+ // This is one of two states:
+ // 1) leaderAndIsr.zkVersion > zkVersion: Controller updated to
new version with proposedIsrState.
+ // 2) leaderAndIsr.zkVersion == zkVersion: No update was
performed since proposed and actual state are the same.
+ // In both cases, we want to move from Pending to Committed state
to ensure new updates are processed.
+
isrState = CommittedIsr(leaderAndIsr.isr.toSet)
zkVersion = leaderAndIsr.zkVersion
info(s"ISR updated from AlterIsr to ${isrState.isr.mkString(",")}
and version updated to [$zkVersion]")