mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469467881



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,28 +1218,20 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-    val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
-  }
+  private def expandIsr(newInSyncReplica: Int): Unit = {
+    pendingInSyncReplicaIds += newInSyncReplica
+    info(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated 
to [${pendingInSyncReplicaIds.mkString(",")}]")
 
-  private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-    val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
pendingInSyncReplicaIds.toList, zkVersion)
+    alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr))
   }
 
-  private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-    zkVersionOpt match {
-      case Some(newVersion) =>
-        inSyncReplicaIds = isr
-        zkVersion = newVersion
-        info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
+  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
+    pendingInSyncReplicaIds --= outOfSyncReplicas

Review comment:
       I'm currently looking at the effective ISR to find new out of sync 
replicas. This can include new ISR members which haven't made it into the 
"true" ISR via LeaderAndIsr yet (like broker=3 in your example). Maybe we 
should only consider removing ISR members iff they are in the true ISR. IOW 
changing from
   
   ```scala
   val candidateReplicaIds = effectiveInSyncReplicaIds - localBrokerId
   ```
   to
   ```scala
   val candidateReplicaIds = inSyncReplicaIds - localBrokerId
   ```
   
   Also, I wonder if the batching that's happening in AlterIsrChannelManager 
violates the model. It sends the request asynchronously with a small delay, so 
multiple ISR changes can be batched into one AlterIsr.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to