artemlivshits commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1353913582


##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +346,38 @@ public Optional<ApiMessageAndVersion> build() {
 
         completeReassignmentIfNeeded();
 
+        boolean isElrEnabled = metadataVersion.isElrSupported();
+        if (isElrEnabled) {
+            populateTargetElr();
+        }
+
         tryElection(record);
 
         triggerLeaderEpochBumpIfNeeded(record);
 
-        if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-            // Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-            record.setIsr(targetIsr);
+        // During the leader election, it can set the record isr if an unclean 
leader election happens.
+        boolean isCleanLeaderElection = record.isr() == null;
+
+        // Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+        if (!isCleanLeaderElection || !isElrEnabled) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+        }
+
+        if (!targetElr.equals(Replicas.toList(partition.elr))) {
+            record.setEligibleLeaderReplicas(targetElr);
+        }
+        if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+            record.setLastKnownELR(targetLastKnownElr);
+        }
+
+        // The record.isr is null if it is a clean election. In this case, it 
will
+        // 1. Set the new ISR if it is different from the current ISR.
+        // 2. Set the new ELR/LastKnowElr if it is different from the current 
ones.
+        if (isCleanLeaderElection) {

Review Comment:
   Looks like the logic here is "set ISR if it wasn't set", so checking `if 
(record.isr() == null)` seems more intentional here.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1995,6 +2015,19 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
             setReplicas(Replicas.toList(partition.replicas)));
     }
 
+    // Visible to test.
+    Integer getTopicEffectiveMinIsr(String topicName) {
+        String minIsrConfig = configurationControl.getTopicConfig(topicName, 
MIN_IN_SYNC_REPLICAS_CONFIG);
+        Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : 
Integer.parseInt(minIsrConfig);
+        Uuid topicId = topicsByName.get(topicName);
+        Integer replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;

Review Comment:
   Is it guaranteed that we have elements in all maps here?  We don't check any 
nulls.



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -186,7 +186,10 @@ public enum MetadataVersion {
     IBP_3_6_IV2(14, "3.6", "IV2", true),
 
     // Implement KIP-919 controller registration.
-    IBP_3_7_IV0(15, "3.7", "IV0", true);
+    IBP_3_7_IV0(15, "3.7", "IV0", true),
+
+    // Add ELR related supports (KIP-966).
+    IBP_3_7_IV1(16, "3.7", "IV1", true);

Review Comment:
   Wouldn't having this prevent version bumps until KIP-966 is fully 
implemented?  Would it be enough to just have `eligible.leader.replicas.enable` 
config that would be used for testing, and then when the feature is ready to be 
release, flip `eligible.leader.replicas.enable` to `true` and add a new 
metadata version.



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +346,38 @@ public Optional<ApiMessageAndVersion> build() {
 
         completeReassignmentIfNeeded();
 
+        boolean isElrEnabled = metadataVersion.isElrSupported();
+        if (isElrEnabled) {
+            populateTargetElr();
+        }
+
         tryElection(record);
 
         triggerLeaderEpochBumpIfNeeded(record);
 
-        if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-            // Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-            record.setIsr(targetIsr);
+        // During the leader election, it can set the record isr if an unclean 
leader election happens.
+        boolean isCleanLeaderElection = record.isr() == null;
+
+        // Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+        if (!isCleanLeaderElection || !isElrEnabled) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+        }
+
+        if (!targetElr.equals(Replicas.toList(partition.elr))) {
+            record.setEligibleLeaderReplicas(targetElr);
+        }
+        if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+            record.setLastKnownELR(targetLastKnownElr);
+        }
+
+        // The record.isr is null if it is a clean election. In this case, it 
will
+        // 1. Set the new ISR if it is different from the current ISR.
+        // 2. Set the new ELR/LastKnowElr if it is different from the current 
ones.

Review Comment:
   Is the comment about ELR out of date?  It's not set in this condition.



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +405,35 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
         }
     }
 
+    private void populateTargetElr() {
+        // If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+        if (targetIsr.size() >= minISR) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+            return;
+        }
+

Review Comment:
   Extra line?



##########
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##########
@@ -17,7 +17,9 @@
   "apiKey": 5,
   "type": "metadata",
   "name": "PartitionChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   Do we need to bump the version if we add just the tagged fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to