mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486535166



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
+  private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+    if (useAlterIsr) {
+      expandIsrWithAlterIsr(newInSyncReplica)
+    } else {
+      expandIsrWithZk(newInSyncReplica)
+    }
+  }
+
+  private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+    // This is called from maybeExpandIsr which holds the ISR write lock
+    if (pendingInSyncReplicaIds.isEmpty) {

Review comment:
       Yea, good idea. I'll leave the check here since we actually acquire and 
release the lock when checking if we should expand/shrink. It's possible that 
pendingInSyncReplicaIds is cleared by a LeaderAndIsr between then and when we 
acquire the write lock to do the update




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to