mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486535868



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+    if (useAlterIsr) {
+      expandIsrWithAlterIsr(newInSyncReplica)
+    } else {
+      expandIsrWithZk(newInSyncReplica)
+    }
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+    // This is called from maybeExpandIsr which holds the ISR write lock
+    if (pendingInSyncReplicaIds.isEmpty) {
+      // When expanding the ISR, we can safely assume the new replica will 
make it into the ISR since this puts us in
+      // a more constrained state for advancing the HW.
+      val newIsr = inSyncReplicaIds + newInSyncReplica
+      pendingInSyncReplicaIds = Some(newIsr)
+      debug(s"Adding new in-sync replica $newInSyncReplica. Pending ISR 
updated to [${newIsr.mkString(",")}] for $topicPartition")
+      alterIsr(newIsr)
+    } else {
+      debug(s"ISR update in-flight, not adding new in-sync replica 
$newInSyncReplica for $topicPartition")
+    }
+  }
+
+  private def expandIsrWithZk(newInSyncReplica: Int): Unit = {
+    val newInSyncReplicaIds = inSyncReplicaIds + newInSyncReplica
+    info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} to 
${newInSyncReplicaIds.mkString(",")}")
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newInSyncReplicaIds.toList, zkVersion)
     val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+    maybeUpdateIsrAndVersionWithZk(newInSyncReplicaIds, zkVersionOpt)
   }
 
-  private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
+  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
+    if (useAlterIsr) {
+      shrinkIsrWithAlterIsr(outOfSyncReplicas)
+    } else {
+      shrinkIsrWithZk(inSyncReplicaIds -- outOfSyncReplicas)
+    }
+  }
+
+  private def shrinkIsrWithAlterIsr(outOfSyncReplicas: Set[Int]): Unit = {
+    // This is called from maybeShrinkIsr which holds the ISR write lock
+    if (pendingInSyncReplicaIds.isEmpty) {
+      // When shrinking the ISR, we cannot assume that the update will succeed 
as this could erroneously advance the HW
+      // We update pendingInSyncReplicaIds here simply to prevent any further 
ISR updates from occurring until we get
+      // the next LeaderAndIsr
+      pendingInSyncReplicaIds = Some(inSyncReplicaIds)

Review comment:
       How about `uncommittedInSyncReplicaIds`?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -278,10 +284,10 @@ class KafkaController(val config: KafkaConfig,
   private def onControllerResignation(): Unit = {
     debug("Resigning")
     // de-register listeners
-    
zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
     zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
     zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
     
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
+    zkClient.unregisterStateChangeHandler(isrChangeNotificationHandler.path)

Review comment:
       nope, will revert




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to