junrao commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r617093936
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -1119,6 +1053,61 @@ void validateManualPartitionAssignment(List<Integer> assignment, } } + void generateLeaderAndIsrUpdates(String context, + int brokerToRemoveFromIsr, + List<ApiMessageAndVersion> records, + Iterator<TopicIdPartition> iterator) { + int oldSize = records.size(); + while (iterator.hasNext()) { + TopicIdPartition topicIdPart = iterator.next(); + TopicControlInfo topic = topics.get(topicIdPart.topicId()); + if (topic == null) { + throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in " + + "isrMembers, but not in the topics map."); + } + PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); + if (partition == null) { + throw new RuntimeException("Partition " + topicIdPart + + " existed in isrMembers, but not in the partitions map."); + } + int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemoveFromIsr); + int newLeader = bestLeader(partition.replicas, newIsr, false); Review comment: One side effect of doing this is that it can force the leader to change unnecessarily. For example, the current leader could still be in newIsr, but may not be the first in replica list. bestLeader() will cause the leader to change in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org