artemlivshits commented on code in PR #14312: URL: https://github.com/apache/kafka/pull/14312#discussion_r1353913582
########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -328,13 +346,38 @@ public Optional<ApiMessageAndVersion> build() { completeReassignmentIfNeeded(); + boolean isElrEnabled = metadataVersion.isElrSupported(); + if (isElrEnabled) { + populateTargetElr(); + } + tryElection(record); triggerLeaderEpochBumpIfNeeded(record); - if (record.isr() == null && !targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) { - // Set the new ISR if it is different from the current ISR and unclean leader election didn't already set it. - record.setIsr(targetIsr); + // During the leader election, it can set the record isr if an unclean leader election happens. + boolean isCleanLeaderElection = record.isr() == null; + + // Clean the ELR related fields if it is an unclean election or ELR is disabled. + if (!isCleanLeaderElection || !isElrEnabled) { + targetElr = Collections.emptyList(); + targetLastKnownElr = Collections.emptyList(); + } + + if (!targetElr.equals(Replicas.toList(partition.elr))) { + record.setEligibleLeaderReplicas(targetElr); + } + if (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) { + record.setLastKnownELR(targetLastKnownElr); + } + + // The record.isr is null if it is a clean election. In this case, it will + // 1. Set the new ISR if it is different from the current ISR. + // 2. Set the new ELR/LastKnowElr if it is different from the current ones. + if (isCleanLeaderElection) { Review Comment: Looks like the logic here is "set ISR if it wasn't set", so checking `if (record.isr() == null)` seems more intentional here. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1995,6 +2015,19 @@ private void listReassigningTopic(ListPartitionReassignmentsResponseData respons setReplicas(Replicas.toList(partition.replicas))); } + // Visible to test. + Integer getTopicEffectiveMinIsr(String topicName) { + String minIsrConfig = configurationControl.getTopicConfig(topicName, MIN_IN_SYNC_REPLICAS_CONFIG); + Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : Integer.parseInt(minIsrConfig); + Uuid topicId = topicsByName.get(topicName); + Integer replicationFactor = topics.get(topicId).parts.get(0).replicas.length; Review Comment: Is it guaranteed that we have elements in all maps here? We don't check any nulls. ########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -186,7 +186,10 @@ public enum MetadataVersion { IBP_3_6_IV2(14, "3.6", "IV2", true), // Implement KIP-919 controller registration. - IBP_3_7_IV0(15, "3.7", "IV0", true); + IBP_3_7_IV0(15, "3.7", "IV0", true), + + // Add ELR related supports (KIP-966). + IBP_3_7_IV1(16, "3.7", "IV1", true); Review Comment: Wouldn't having this prevent version bumps until KIP-966 is fully implemented? Would it be enough to just have `eligible.leader.replicas.enable` config that would be used for testing, and then when the feature is ready to be release, flip `eligible.leader.replicas.enable` to `true` and add a new metadata version. ########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -328,13 +346,38 @@ public Optional<ApiMessageAndVersion> build() { completeReassignmentIfNeeded(); + boolean isElrEnabled = metadataVersion.isElrSupported(); + if (isElrEnabled) { + populateTargetElr(); + } + tryElection(record); triggerLeaderEpochBumpIfNeeded(record); - if (record.isr() == null && !targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) { - // Set the new ISR if it is different from the current ISR and unclean leader election didn't already set it. - record.setIsr(targetIsr); + // During the leader election, it can set the record isr if an unclean leader election happens. + boolean isCleanLeaderElection = record.isr() == null; + + // Clean the ELR related fields if it is an unclean election or ELR is disabled. + if (!isCleanLeaderElection || !isElrEnabled) { + targetElr = Collections.emptyList(); + targetLastKnownElr = Collections.emptyList(); + } + + if (!targetElr.equals(Replicas.toList(partition.elr))) { + record.setEligibleLeaderReplicas(targetElr); + } + if (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) { + record.setLastKnownELR(targetLastKnownElr); + } + + // The record.isr is null if it is a clean election. In this case, it will + // 1. Set the new ISR if it is different from the current ISR. + // 2. Set the new ELR/LastKnowElr if it is different from the current ones. Review Comment: Is the comment about ELR out of date? It's not set in this condition. ########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -362,6 +405,35 @@ private void setAssignmentChanges(PartitionChangeRecord record) { } } + private void populateTargetElr() { + // If the ISR is larger or equal to the min ISR, clear the ELR and lastKnownELR. + if (targetIsr.size() >= minISR) { + targetElr = Collections.emptyList(); + targetLastKnownElr = Collections.emptyList(); + return; + } + Review Comment: Extra line? ########## metadata/src/main/resources/common/metadata/PartitionChangeRecord.json: ########## @@ -17,7 +17,9 @@ "apiKey": 5, "type": "metadata", "name": "PartitionChangeRecord", - "validVersions": "0", + "validVersions": "0-1", Review Comment: Do we need to bump the version if we add just the tagged fields? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org