mumrah commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1519830343
########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -2240,6 +2283,25 @@ private void updatePartitionDirectories( } } + private void updatePartitionInfo( + Uuid topicId, + Integer partitionId, + PartitionRegistration prevPartInfo, + PartitionRegistration newPartInfo + ) { + HashSet<Integer> validationSet = new HashSet<>(); + Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii)); + Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii)); + if (validationSet.size() != newPartInfo.isr.length + newPartInfo.elr.length) { + log.warn("{}-{} has overlapping ISR={} and ELR={}", topics.get(topicId).name, partitionId, + Arrays.toString(newPartInfo.isr), partitionId, Arrays.toString(newPartInfo.elr)); Review Comment: This can only happen if we have a bug where the ELR and ISR are allowed to overlap right? Since this is part of the `replay`, we shouldn't throw here (since the record has already been committed), but perhaps an ERROR is better than a WARN. ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -336,10 +350,10 @@ public ControllerResult<BrokerRegistrationReply> registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); + List<ApiMessageAndVersion> records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { - // TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. - log.debug("Received an unclean shutdown request"); Review Comment: I think we inadvertently lost this log message. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -2240,6 +2283,25 @@ private void updatePartitionDirectories( } } + private void updatePartitionInfo( + Uuid topicId, + Integer partitionId, + PartitionRegistration prevPartInfo, + PartitionRegistration newPartInfo + ) { + HashSet<Integer> validationSet = new HashSet<>(); + Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii)); + Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii)); Review Comment: nit: i think you can do `forEach(validationSet::add)` here ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1815,12 +1845,17 @@ void validateManualPartitionAssignment( * broker to remove from the ISR and leadership, otherwise. * @param brokerToAdd NO_LEADER if no broker is being added; the ID of the * broker which is now eligible to be a leader, otherwise. + * @param brokerWithUncleanShutdown Review Comment: nit: update the main description of this method to mention ISR and ELR ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -780,4 +793,9 @@ public Entry<Integer, Map<String, VersionRange>> next() { } }; } + + @FunctionalInterface + interface BrokerUncleanShutdownHandler { + void apply(int brokerId, List<ApiMessageAndVersion> records); Review Comment: nit: since we're defining an interface, we can use a more descriptive name than "apply" for the method. Maybe "addRecordsForShutdown" or something. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context, // from the target ISR, but we need to exclude it here too, to handle the case // where there is an unclean leader election which chooses a leader from outside // the ISR. + // + // If the caller passed a valid broker ID for brokerWithUncleanShutdown, rather than + // passing NO_LEADER, this node should not be an acceptable leader. We also exclude + // brokerWithUncleanShutdown from ELR and ISR. IntPredicate isAcceptableLeader = - r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); + r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) + && (r == brokerToAdd || clusterControl.isActive(r)); Review Comment: Since our guards around ELR (if it's enabled or not) are in PartitionChangeBuilder, we need to make sure this logic is correct when ELR is not enabled due to MV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org