junrao commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r617858509



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -172,47 +174,54 @@ String diff(PartitionControlInfo prev) {
             StringBuilder builder = new StringBuilder();
             String prefix = "";
             if (!Arrays.equals(replicas, prev.replicas)) {
-                
builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas));
+                builder.append(prefix).append("replicas: ").
+                    append(Arrays.toString(prev.replicas)).
+                    append(" -> ").append(Arrays.toString(replicas));
                 prefix = ", ";
-                
builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas));
             }
             if (!Arrays.equals(isr, prev.isr)) {
-                
builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr));
+                builder.append(prefix).append("isr: ").
+                    append(Arrays.toString(prev.isr)).
+                    append(" -> ").append(Arrays.toString(isr));
                 prefix = ", ";
-                
builder.append(prefix).append("newIsr=").append(Arrays.toString(isr));
             }
             if (!Arrays.equals(removingReplicas, prev.removingReplicas)) {
-                builder.append(prefix).append("oldRemovingReplicas=").
-                    append(Arrays.toString(prev.removingReplicas));
+                builder.append(prefix).append("removingReplicas: ").
+                    append(Arrays.toString(prev.removingReplicas)).
+                    append(" -> ").append(Arrays.toString(removingReplicas));
                 prefix = ", ";
-                builder.append(prefix).append("newRemovingReplicas=").
-                    append(Arrays.toString(removingReplicas));
             }
             if (!Arrays.equals(addingReplicas, prev.addingReplicas)) {
-                builder.append(prefix).append("oldAddingReplicas=").
-                    append(Arrays.toString(prev.addingReplicas));
+                builder.append(prefix).append("addingReplicas: ").
+                    append(Arrays.toString(prev.addingReplicas)).
+                    append(" -> ").append(Arrays.toString(addingReplicas));
                 prefix = ", ";
-                builder.append(prefix).append("newAddingReplicas=").
-                    append(Arrays.toString(addingReplicas));
             }
             if (leader != prev.leader) {
-                
builder.append(prefix).append("oldLeader=").append(prev.leader);
+                builder.append(prefix).append("leader: ").
+                    append(prev.leader).append(" -> ").append(leader);
                 prefix = ", ";
-                builder.append(prefix).append("newLeader=").append(leader);
             }
             if (leaderEpoch != prev.leaderEpoch) {
-                
builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch);
+                builder.append(prefix).append("leaderEpoch: ").
+                    append(prev.leaderEpoch).append(" -> 
").append(leaderEpoch);
                 prefix = ", ";
-                
builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch);
             }
             if (partitionEpoch != prev.partitionEpoch) {
-                
builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch);
-                prefix = ", ";
-                
builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch);
+                builder.append(prefix).append("partitionEpoch: ").
+                    append(prev.partitionEpoch).append(" -> 
").append(partitionEpoch);
             }
             return builder.toString();
         }
 
+        void maybeLogPartitionChange(Logger log, String description, 
PartitionControlInfo prev) {
+            if (!electionWasClean(leader, prev.isr)) {
+                log.info("UNCLEAN partition change for {}: {}", description, 
diff(prev));
+            } else if (log.isDebugEnabled()) {
+                log.debug("partition change for {}: {}", description, 
diff(prev));

Review comment:
       We use to log all leader and isr changes in info even for clean leader 
election. 

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -321,22 +336,18 @@ public void replay(PartitionRecord record) {
         }
         PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
         PartitionControlInfo prevPartInfo = 
topicInfo.parts.get(record.partitionId());
+        String description = topicInfo.name + "-" + record.partitionId() +
+            " with ID " + record.topicId();

Review comment:
       with ID => with topic ID to make it clear?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -957,22 +890,27 @@ ApiError electLeader(String topic, int partitionId, 
boolean unclean,
         return ControllerResult.of(records, reply);
     }
 
-    int bestLeader(int[] replicas, int[] isr, boolean unclean) {
+    static boolean isGoodLeader(int[] isr, int leader) {
+        return Replicas.contains(isr, leader);
+    }
+
+    static int bestLeader(int[] replicas, int[] isr, boolean uncleanOk,

Review comment:
       Would it be better to return optional int here?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -957,22 +890,27 @@ ApiError electLeader(String topic, int partitionId, 
boolean unclean,
         return ControllerResult.of(records, reply);
     }
 
-    int bestLeader(int[] replicas, int[] isr, boolean unclean) {
+    static boolean isGoodLeader(int[] isr, int leader) {
+        return Replicas.contains(isr, leader);
+    }
+
+    static int bestLeader(int[] replicas, int[] isr, boolean uncleanOk,
+                          Function<Integer, Boolean> isAcceptableLeader) {
+        int bestUnclean = NO_LEADER;
         for (int i = 0; i < replicas.length; i++) {
             int replica = replicas[i];
-            if (Replicas.contains(isr, replica)) {
-                return replica;
-            }
-        }
-        if (unclean) {
-            for (int i = 0; i < replicas.length; i++) {
-                int replica = replicas[i];
-                if (clusterControl.unfenced(replica)) {
+            if (isAcceptableLeader.apply(replica)) {
+                if (bestUnclean == NO_LEADER) bestUnclean = replica;
+                if (Replicas.contains(isr, replica)) {
                     return replica;
                 }
             }
         }
-        return NO_LEADER;
+        return uncleanOk ? bestUnclean : NO_LEADER;
+    }
+
+    static boolean electionWasClean(int newLeader, int[] prevIsr) {

Review comment:
       Since we don't always pass in previous isr to prevIsr, perhaps we could 
name it more accurately?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -356,7 +369,8 @@ public void replay(PartitionChangeRecord record) {
         brokersToIsrs.update(record.topicId(), record.partitionId(),
             prevPartitionInfo.isr, newPartitionInfo.isr, 
prevPartitionInfo.leader,
             newPartitionInfo.leader);
-        log.debug("Applied ISR change record: {}", record.toString());
+        String topicPart = topicInfo.name + "-" + record.partitionId();

Review comment:
       It seems that we haven't added the topicId yet.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List<Integer> 
assignment,
         }
     }
 
+    /**
+     * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+     * changes if necessary.
+     *
+     * @param context           A human-readable context string used in log4j 
logging.
+     * @param brokerToRemove    NO_LEADER if no broker is being removed; the 
ID of the
+     *                          broker to remove from the ISR and leadership, 
otherwise.
+     * @param brokerToAdd       NO_LEADER if no broker is being added; the ID 
of the
+     *                          broker which is now eligible to be a leader, 
otherwise.
+     * @param records           A list of records which we will append to.
+     * @param iterator          The iterator containing the partitions to 
examine.
+     */
+    void generateLeaderAndIsrUpdates(String context,
+                                     int brokerToRemove,
+                                     int brokerToAdd,
+                                     List<ApiMessageAndVersion> records,
+                                     Iterator<TopicIdPartition> iterator) {
+        int oldSize = records.size();
+        Function<Integer, Boolean> isAcceptableLeader =
+            r -> r == brokerToAdd || clusterControl.unfenced(r);
+        while (iterator.hasNext()) {
+            TopicIdPartition topicIdPart = iterator.next();
+            TopicControlInfo topic = topics.get(topicIdPart.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+
+                        " existed in isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicIdPart +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
+            int newLeader;
+            if (isGoodLeader(newIsr, partition.leader)) {
+                // If the current leader is good, don't change.
+                newLeader = partition.leader;
+            } else {
+                // Choose a new leader.
+                boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+                newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, 
isAcceptableLeader);

Review comment:
       In the case of controlled shutdown, currently, we ignore the unclean 
leader election flag and always to elect a new leader cleanly.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List<Integer> 
assignment,
         }
     }
 
+    /**
+     * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+     * changes if necessary.
+     *
+     * @param context           A human-readable context string used in log4j 
logging.
+     * @param brokerToRemove    NO_LEADER if no broker is being removed; the 
ID of the

Review comment:
       Instead of relying upon NO_LEADER, could we make brokerToRemove and 
brokerToAdd optional int to make them clearer?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1057,83 @@ void validateManualPartitionAssignment(List<Integer> 
assignment,
         }
     }
 
+    /**
+     * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+     * changes if necessary.
+     *
+     * @param context           A human-readable context string used in log4j 
logging.
+     * @param brokerToRemove    NO_LEADER if no broker is being removed; the 
ID of the
+     *                          broker to remove from the ISR and leadership, 
otherwise.
+     * @param brokerToAdd       NO_LEADER if no broker is being added; the ID 
of the
+     *                          broker which is now eligible to be a leader, 
otherwise.
+     * @param records           A list of records which we will append to.
+     * @param iterator          The iterator containing the partitions to 
examine.
+     */
+    void generateLeaderAndIsrUpdates(String context,
+                                     int brokerToRemove,
+                                     int brokerToAdd,
+                                     List<ApiMessageAndVersion> records,
+                                     Iterator<TopicIdPartition> iterator) {
+        int oldSize = records.size();
+        Function<Integer, Boolean> isAcceptableLeader =
+            r -> r == brokerToAdd || clusterControl.unfenced(r);

Review comment:
       Do we need to consider brokerToRemove here? Also, do we expect at least 
one of brokerToRemove and brokerToAdd to be NO_LEADER? If so, should we assert 
that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to