artemlivshits commented on code in PR #14312: URL: https://github.com/apache/kafka/pull/14312#discussion_r1349318665
########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -328,13 +349,38 @@ public Optional<ApiMessageAndVersion> build() { completeReassignmentIfNeeded(); + boolean isElrEnabled = metadataVersion.isElrSupported(); + if (isElrEnabled) { + populateTargetElr(); + } + tryElection(record); triggerLeaderEpochBumpIfNeeded(record); - if (record.isr() == null && !targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) { - // Set the new ISR if it is different from the current ISR and unclean leader election didn't already set it. - record.setIsr(targetIsr); + boolean isCleanLeaderElection = record.isr() == null; + + // Clean the ELR related fields if it is an unclean election or ELR is disabled. + if (!isCleanLeaderElection || !isElrEnabled) { + targetElr = Collections.emptyList(); + targetLastKnownElr = Collections.emptyList(); + } + + if (!targetElr.equals(Replicas.toList(partition.elr))) { + record.setEligibleLeaderReplicas(targetElr); + } + if (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) { + record.setLastKnownELR(targetLastKnownElr); + } + + // The record.isr is null if it is a clean election. In this case, it will + // 1. Set the new ISR if it is different from the current ISR. + // 2. Set the new ELR/LastKnowElr if it is different from the current ones. + if (isCleanLeaderElection) { + // TODO(KIP-966) If ELR is enabled, the ISR is allowed to be empty. Review Comment: Is this a TODO to be addressed in a current change? Generally, TODOs in the committed code should be avoided and instead proper JIRA tickets should be filed. In this case can we just incorporate isElrEnabled into the condition and remove TODO? ########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -328,13 +349,38 @@ public Optional<ApiMessageAndVersion> build() { completeReassignmentIfNeeded(); + boolean isElrEnabled = metadataVersion.isElrSupported(); + if (isElrEnabled) { + populateTargetElr(); + } + tryElection(record); triggerLeaderEpochBumpIfNeeded(record); - if (record.isr() == null && !targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) { - // Set the new ISR if it is different from the current ISR and unclean leader election didn't already set it. - record.setIsr(targetIsr); + boolean isCleanLeaderElection = record.isr() == null; Review Comment: We should probably add a comment for why this is clean leader election condition, it's not immediately obvious from the logic in this function. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1008,7 +1024,8 @@ ControllerResult<AlterPartitionResponseData> alterPartition( topic.id, partitionId, clusterControl::isActive, - featureControl.metadataVersion() + featureControl.metadataVersion(), + Integer.parseInt(configurationControl.getTopicConfigs(topic.name).getOrDefault(MIN_IN_SYNC_REPLICAS_CONFIG, String.valueOf(defaultMinInSyncIsr))) Review Comment: Looks like we pass this long statement in many places, maybe we should add a small utility method configurationControl.minInSyncReplicas(topic.name)? ########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -362,6 +408,38 @@ private void setAssignmentChanges(PartitionChangeRecord record) { } } + private void populateTargetElr() { + // If the ISR is larger or equal to the min ISR, clear the ELR and lastKnownELR. + if (targetIsr.size() >= minISR) { + targetElr = Collections.emptyList(); + targetLastKnownElr = Collections.emptyList(); + return; + } + + Set<Integer> currentIsrSet = Arrays.stream(partition.isr).boxed().collect(Collectors.toSet()); + Set<Integer> targetIsrSet = targetIsr.stream().collect(Collectors.toSet()); + Set<Integer> currentElrSet = Arrays.stream(partition.elr).boxed().collect(Collectors.toSet()); + Set<Integer> currentLastKnownElrSet = Arrays.stream(partition.lastKnownElr).boxed().collect(Collectors.toSet()); + + // Tracking the ELR. The new elr is expected to + // 1. Include the current ISR + // 2. Exclude the duplicate replicas between elr and target ISR. + // 3. Exclude unclean shutdown replicas. + // To do that, we first union the current ISR and current elr, then filter out the target ISR and unclean shutdown + // Replicas. + Set<Integer> elrCandidates = Utils.union(HashSet::new, currentElrSet, currentIsrSet); + Set<Integer> newElr = elrCandidates.stream() + .filter(replica -> !targetIsrSet.contains(replica) && (uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica))) + .collect(Collectors.toSet()); + targetElr = newElr.stream().collect(Collectors.toList()); Review Comment: Can we replace these 2 operations and one intermediate allocation with ``` targerElr = elrCandidates.stream() .filter(replica -> !targetIsrSet.contains(replica) && (uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica))) .collect(Collectors.toList()); ########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -362,6 +408,38 @@ private void setAssignmentChanges(PartitionChangeRecord record) { } } + private void populateTargetElr() { + // If the ISR is larger or equal to the min ISR, clear the ELR and lastKnownELR. + if (targetIsr.size() >= minISR) { + targetElr = Collections.emptyList(); + targetLastKnownElr = Collections.emptyList(); + return; + } + + Set<Integer> currentIsrSet = Arrays.stream(partition.isr).boxed().collect(Collectors.toSet()); + Set<Integer> targetIsrSet = targetIsr.stream().collect(Collectors.toSet()); + Set<Integer> currentElrSet = Arrays.stream(partition.elr).boxed().collect(Collectors.toSet()); + Set<Integer> currentLastKnownElrSet = Arrays.stream(partition.lastKnownElr).boxed().collect(Collectors.toSet()); Review Comment: Do we need all intermediate containers / allocations? Looks like we can start with 2 sets: ``` Set<Integer> candidateElrSet = Arrays.stream(partition.elr).boxed().collect(Collectors.toSet()); Set<Integer> candidateLastKnownElrSet = Arrays.stream(partition.lastKnownElr).boxed().collect(Collectors.toSet()); ``` Then we can add current ISR elements into both candidate sets and remove duplicate elements from the sets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org