mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469467881
##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1210,28 +1218,20 @@ class Partition(val topicPartition: TopicPartition,
}
}
- private def expandIsr(newIsr: Set[Int]): Unit = {
- val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
newIsr.toList, zkVersion)
- val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
- maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
- }
+ private def expandIsr(newInSyncReplica: Int): Unit = {
+ pendingInSyncReplicaIds += newInSyncReplica
+ info(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated
to [${pendingInSyncReplicaIds.mkString(",")}]")
- private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
- val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
newIsr.toList, zkVersion)
- val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
- maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+ val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch,
pendingInSyncReplicaIds.toList, zkVersion)
+ alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition,
newLeaderAndIsr))
}
- private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt:
Option[Int]): Unit = {
- zkVersionOpt match {
- case Some(newVersion) =>
- inSyncReplicaIds = isr
- zkVersion = newVersion
- info("ISR updated to [%s] and zkVersion updated to
[%d]".format(isr.mkString(","), zkVersion))
+ private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
+ pendingInSyncReplicaIds --= outOfSyncReplicas
Review comment:
I'm currently looking at the effective ISR to find new out of sync
replicas. This can include new ISR members which haven't made it into the
"true" ISR via LeaderAndIsr yet (like broker=3 in your example). Maybe we
should only consider removing ISR members iff they are in the true ISR. IOW
changing from
```scala
val candidateReplicaIds = effectiveInSyncReplicaIds - localBrokerId
```
to
```scala
val candidateReplicaIds = inSyncReplicaIds - localBrokerId
```
Also, I wonder if the batching that's happening in AlterIsrChannelManager
violates the model. It sends the request asynchronously with a small delay, so
multiple ISR changes can be batched into one AlterIsr.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]