mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r486535166
##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,19 +1243,66 @@ class Partition(val topicPartition: TopicPartition,
}
}
- private def expandIsr(newIsr: Set[Int]): Unit = {
- val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
newIsr.toList, zkVersion)
+ private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
+ if (useAlterIsr) {
+ expandIsrWithAlterIsr(newInSyncReplica)
+ } else {
+ expandIsrWithZk(newInSyncReplica)
+ }
+ }
+
+ private def expandIsrWithAlterIsr(newInSyncReplica: Int): Unit = {
+ // This is called from maybeExpandIsr which holds the ISR write lock
+ if (pendingInSyncReplicaIds.isEmpty) {
Review comment:
Yea, good idea. I'll leave the check here since we actually acquire and
release the lock when checking if we should expand/shrink. It's possible that
pendingInSyncReplicaIds is cleared by an update between then and when we
acquire the write lock to do the update
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]