artemlivshits commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1349318665


##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +349,38 @@ public Optional<ApiMessageAndVersion> build() {
 
         completeReassignmentIfNeeded();
 
+        boolean isElrEnabled = metadataVersion.isElrSupported();
+        if (isElrEnabled) {
+            populateTargetElr();
+        }
+
         tryElection(record);
 
         triggerLeaderEpochBumpIfNeeded(record);
 
-        if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-            // Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-            record.setIsr(targetIsr);
+        boolean isCleanLeaderElection = record.isr() == null;
+
+        // Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+        if (!isCleanLeaderElection || !isElrEnabled) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+        }
+
+        if (!targetElr.equals(Replicas.toList(partition.elr))) {
+            record.setEligibleLeaderReplicas(targetElr);
+        }
+        if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+            record.setLastKnownELR(targetLastKnownElr);
+        }
+
+        // The record.isr is null if it is a clean election. In this case, it 
will
+        // 1. Set the new ISR if it is different from the current ISR.
+        // 2. Set the new ELR/LastKnowElr if it is different from the current 
ones.
+        if (isCleanLeaderElection) {
+            // TODO(KIP-966) If ELR is enabled, the ISR is allowed to be empty.

Review Comment:
   Is this a TODO to be addressed in a current change?  Generally, TODOs in the 
committed code should be avoided and instead proper JIRA tickets should be 
filed.
   
   In this case can we just incorporate isElrEnabled into the condition and 
remove TODO?



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +349,38 @@ public Optional<ApiMessageAndVersion> build() {
 
         completeReassignmentIfNeeded();
 
+        boolean isElrEnabled = metadataVersion.isElrSupported();
+        if (isElrEnabled) {
+            populateTargetElr();
+        }
+
         tryElection(record);
 
         triggerLeaderEpochBumpIfNeeded(record);
 
-        if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-            // Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-            record.setIsr(targetIsr);
+        boolean isCleanLeaderElection = record.isr() == null;

Review Comment:
   We should probably add a comment for why this is clean leader election 
condition, it's not immediately obvious from the logic in this function.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1008,7 +1024,8 @@ ControllerResult<AlterPartitionResponseData> 
alterPartition(
                     topic.id,
                     partitionId,
                     clusterControl::isActive,
-                    featureControl.metadataVersion()
+                    featureControl.metadataVersion(),
+                    
Integer.parseInt(configurationControl.getTopicConfigs(topic.name).getOrDefault(MIN_IN_SYNC_REPLICAS_CONFIG,
 String.valueOf(defaultMinInSyncIsr)))

Review Comment:
   Looks like we pass this long statement in many places, maybe we should add a 
small utility method configurationControl.minInSyncReplicas(topic.name)?



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +408,38 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
         }
     }
 
+    private void populateTargetElr() {
+        // If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+        if (targetIsr.size() >= minISR) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+            return;
+        }
+
+        Set<Integer> currentIsrSet = 
Arrays.stream(partition.isr).boxed().collect(Collectors.toSet());
+        Set<Integer> targetIsrSet = 
targetIsr.stream().collect(Collectors.toSet());
+        Set<Integer> currentElrSet = 
Arrays.stream(partition.elr).boxed().collect(Collectors.toSet());
+        Set<Integer> currentLastKnownElrSet = 
Arrays.stream(partition.lastKnownElr).boxed().collect(Collectors.toSet());
+
+        // Tracking the ELR. The new elr is expected to
+        // 1. Include the current ISR
+        // 2. Exclude the duplicate replicas between elr and target ISR.
+        // 3. Exclude unclean shutdown replicas.
+        // To do that, we first union the current ISR and current elr, then 
filter out the target ISR and unclean shutdown
+        // Replicas.
+        Set<Integer> elrCandidates = Utils.union(HashSet::new, currentElrSet, 
currentIsrSet);
+        Set<Integer> newElr = elrCandidates.stream()
+            .filter(replica -> !targetIsrSet.contains(replica) && 
(uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica)))
+            .collect(Collectors.toSet());
+        targetElr = newElr.stream().collect(Collectors.toList());

Review Comment:
   Can we replace these 2 operations and one intermediate allocation with 
   ```
          targerElr = elrCandidates.stream()
               .filter(replica -> !targetIsrSet.contains(replica) && 
(uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica)))
               .collect(Collectors.toList());



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +408,38 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
         }
     }
 
+    private void populateTargetElr() {
+        // If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+        if (targetIsr.size() >= minISR) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+            return;
+        }
+
+        Set<Integer> currentIsrSet = 
Arrays.stream(partition.isr).boxed().collect(Collectors.toSet());
+        Set<Integer> targetIsrSet = 
targetIsr.stream().collect(Collectors.toSet());
+        Set<Integer> currentElrSet = 
Arrays.stream(partition.elr).boxed().collect(Collectors.toSet());
+        Set<Integer> currentLastKnownElrSet = 
Arrays.stream(partition.lastKnownElr).boxed().collect(Collectors.toSet());

Review Comment:
   Do we need all intermediate containers / allocations?  Looks like we can 
start with 2 sets:
   
   ```
           Set<Integer> candidateElrSet = 
Arrays.stream(partition.elr).boxed().collect(Collectors.toSet());
           Set<Integer> candidateLastKnownElrSet = 
Arrays.stream(partition.lastKnownElr).boxed().collect(Collectors.toSet());
   ```
   Then we can add current ISR elements into both candidate sets and remove 
duplicate elements from the sets.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to