cmccabe commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r465708837



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1212,17 +1221,32 @@ class Partition(val topicPartition: TopicPartition,
 
   private def expandIsr(newIsr: Set[Int]): Unit = {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-    val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+    //val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
+    //maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+    alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr, handleIsrUpdate))
+    alterIsrInFlight = true
+
+    info("ISR updated to [%s]".format(newIsr.mkString(",")))
+    inSyncReplicaIds = newIsr
   }
 
   private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-    val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-    maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+    //val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
+    //maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+    alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr, handleIsrUpdate))
+    alterIsrInFlight = true
   }
 
-  private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
+  private def handleIsrUpdate(error: Errors): Unit = {
+    error match {
+      case NONE => println(s"Controller accepted ISR for $topicPartition")
+      case _ => println(s"Had an error sending ISR for $topicPartition : 
$error")
+    }
+    alterIsrInFlight = false
+  }

Review comment:
       It seems like we need to set the `inSyncReplicaIds` here, since we don't 
do it in `shrinkIsr`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to