mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1519830343


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2240,6 +2283,25 @@ private void updatePartitionDirectories(
         }
     }
 
+    private void updatePartitionInfo(
+        Uuid topicId,
+        Integer partitionId,
+        PartitionRegistration prevPartInfo,
+        PartitionRegistration newPartInfo
+    ) {
+        HashSet<Integer> validationSet = new HashSet<>();
+        Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii));
+        Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii));
+        if (validationSet.size() != newPartInfo.isr.length + 
newPartInfo.elr.length) {
+            log.warn("{}-{} has overlapping ISR={} and ELR={}", 
topics.get(topicId).name, partitionId,
+                Arrays.toString(newPartInfo.isr), partitionId, 
Arrays.toString(newPartInfo.elr));

Review Comment:
   This can only happen if we have a bug where the ELR and ISR are allowed to 
overlap right? 
   
   Since this is part of the `replay`, we shouldn't throw here (since the 
record has already been committed), but perhaps an ERROR is better than a WARN.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -336,10 +350,10 @@ public ControllerResult<BrokerRegistrationReply> 
registerBroker(
                 ", but got cluster ID " + request.clusterId());
         }
         int brokerId = request.brokerId();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerRegistration existing = brokerRegistrations.get(brokerId);
         if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-            // TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-            log.debug("Received an unclean shutdown request");

Review Comment:
   I think we inadvertently lost this log message.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2240,6 +2283,25 @@ private void updatePartitionDirectories(
         }
     }
 
+    private void updatePartitionInfo(
+        Uuid topicId,
+        Integer partitionId,
+        PartitionRegistration prevPartInfo,
+        PartitionRegistration newPartInfo
+    ) {
+        HashSet<Integer> validationSet = new HashSet<>();
+        Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii));
+        Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii));

Review Comment:
   nit: i think you can do `forEach(validationSet::add)` here



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1815,12 +1845,17 @@ void validateManualPartitionAssignment(
      *                          broker to remove from the ISR and leadership, 
otherwise.
      * @param brokerToAdd       NO_LEADER if no broker is being added; the ID 
of the
      *                          broker which is now eligible to be a leader, 
otherwise.
+     * @param brokerWithUncleanShutdown

Review Comment:
   nit: update the main description of this method to mention ISR and ELR



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -780,4 +793,9 @@ public Entry<Integer, Map<String, VersionRange>> next() {
             }
         };
     }
+
+    @FunctionalInterface
+    interface BrokerUncleanShutdownHandler {
+        void apply(int brokerId, List<ApiMessageAndVersion> records);

Review Comment:
   nit: since we're defining an interface, we can use a more descriptive name 
than "apply" for the method. Maybe "addRecordsForShutdown" or something.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context,
         // from the target ISR, but we need to exclude it here too, to handle 
the case
         // where there is an unclean leader election which chooses a leader 
from outside
         // the ISR.
+        //
+        // If the caller passed a valid broker ID for 
brokerWithUncleanShutdown, rather than
+        // passing NO_LEADER, this node should not be an acceptable leader. We 
also exclude
+        // brokerWithUncleanShutdown from ELR and ISR.
         IntPredicate isAcceptableLeader =
-            r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+            r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)
+                && (r == brokerToAdd || clusterControl.isActive(r));

Review Comment:
   Since our guards around ELR (if it's enabled or not) are in 
PartitionChangeBuilder, we need to make sure this logic is correct when ELR is 
not enabled due to MV. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to