cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612795358



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -832,13 +833,33 @@ void handleNodeActivated(int brokerId, 
List<ApiMessageAndVersion> records) {
                 throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
-            // TODO: if this partition is configured for unclean leader 
election,
-            // check the replica set rather than the ISR.
-            if (Replicas.contains(partition.isr, brokerId)) {
-                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+            boolean brokerInIsr = Replicas.contains(partition.isr, brokerId);
+            boolean shouldBecomeLeader;
+            if 
(configurationControl.shouldUseUncleanLeaderElection(topic.name)) {
+                shouldBecomeLeader = Replicas.contains(partition.replicas, 
brokerId);
+            } else {
+                shouldBecomeLeader = brokerInIsr;
+            }
+            if (shouldBecomeLeader) {
+                if (brokerInIsr) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("The newly active node {} will be the leader 
for the " +
+                            "previously offline partition {}.",
+                            brokerId, topicIdPartition);
+                    }
+                } else {
+                    log.info("The newly active node {} will be the leader for 
the " +
+                        "previously offline partition {}, after an UNCLEAN 
leader election.",
+                        brokerId, topicIdPartition);
+                }
+                PartitionChangeRecord record = new PartitionChangeRecord().
                     setPartitionId(topicIdPartition.partitionId()).
                     setTopicId(topic.id).
-                    setLeader(brokerId), (short) 0));
+                    setLeader(brokerId);
+                if (!brokerInIsr) {
+                    record.setIsr(Replicas.toList(partition.isr, brokerId));

Review comment:
       Good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to