cmccabe commented on a change in pull request #10463: URL: https://github.com/apache/kafka/pull/10463#discussion_r612795358
########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -832,13 +833,33 @@ void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) { throw new RuntimeException("Partition " + topicIdPartition + " existed in isrMembers, but not in the partitions map."); } - // TODO: if this partition is configured for unclean leader election, - // check the replica set rather than the ISR. - if (Replicas.contains(partition.isr, brokerId)) { - records.add(new ApiMessageAndVersion(new PartitionChangeRecord(). + boolean brokerInIsr = Replicas.contains(partition.isr, brokerId); + boolean shouldBecomeLeader; + if (configurationControl.shouldUseUncleanLeaderElection(topic.name)) { + shouldBecomeLeader = Replicas.contains(partition.replicas, brokerId); + } else { + shouldBecomeLeader = brokerInIsr; + } + if (shouldBecomeLeader) { + if (brokerInIsr) { + if (log.isDebugEnabled()) { + log.debug("The newly active node {} will be the leader for the " + + "previously offline partition {}.", + brokerId, topicIdPartition); + } + } else { + log.info("The newly active node {} will be the leader for the " + + "previously offline partition {}, after an UNCLEAN leader election.", + brokerId, topicIdPartition); + } + PartitionChangeRecord record = new PartitionChangeRecord(). setPartitionId(topicIdPartition.partitionId()). setTopicId(topic.id). - setLeader(brokerId), (short) 0)); + setLeader(brokerId); + if (!brokerInIsr) { + record.setIsr(Replicas.toList(partition.isr, brokerId)); Review comment: Good catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org