mumrah commented on code in PR #14312: URL: https://github.com/apache/kafka/pull/14312#discussion_r1362167841
########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -328,13 +353,37 @@ public Optional<ApiMessageAndVersion> build() { completeReassignmentIfNeeded(); + if (PartitionChangeBuilder.this.eligibleLeaderReplicasEnabled) { + populateTargetElr(); + } + tryElection(record); triggerLeaderEpochBumpIfNeeded(record); - if (record.isr() == null && !targetIsr.isEmpty() && !targetIsr.equals(Replicas.toList(partition.isr))) { - // Set the new ISR if it is different from the current ISR and unclean leader election didn't already set it. - record.setIsr(targetIsr); + // During the leader election, it can set the record isr if an unclean leader election happens. Review Comment: I guess what I don't like is the logic split between `populateTargetElr` and the lines below. For example, in populateTargetElr we always set targetElr and targetLastKnownElr, but in some cases we immediately clear those collections. It would be nice if all the ELR logic was inside populateTargetElr. E.g., ``` if (eligibleLeaderReplicasEnabled) { populateTargetElr(); // compute the ELR fields and set them on the record } else { // set the ELR fields to empty (or whatever defaults) } ``` It might actually be more fitting with the code in this class to have a method like `maybePopulateTargetElr` that is unconditionally called and put the all the ELR logic there. E.g., ``` completeReassignmentIfNeeded(); tryElection(record); triggerLeaderEpochBumpIfNeeded(record); maybePopulateTargetElr(record); ``` ########## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ########## @@ -434,6 +434,14 @@ Map<String, String> getConfigs(ConfigResource configResource) { } } + String getTopicConfig(String topicName, String configKey) { Review Comment: Let's add some javadoc to this explaining the behavior (like the default empty string) ########## metadata/src/main/resources/common/metadata/PartitionChangeRecord.json: ########## @@ -17,7 +17,9 @@ "apiKey": 5, "type": "metadata", "name": "PartitionChangeRecord", - "validVersions": "0", + "validVersions": "0-1", Review Comment: If we're just adding tagged fields, no we don't need the version bump. If we consider the downgrade case where someone decreases the MV below 3.7-IV1, what we (will eventually) do is take a snapshot of the current metadata log at the older MV which can be a lossy operation. Since tagged fields are generic and optional, there's no problem re-writing a record to _not_ include the tagged fields. Actually, as a side note, PartitionChangeRecord isn't even included in the snapshot (only PartitionRecord is), so it doesn't affect downgrades at all. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -1995,6 +2015,19 @@ private void listReassigningTopic(ListPartitionReassignmentsResponseData respons setReplicas(Replicas.toList(partition.replicas))); } + // Visible to test. + Integer getTopicEffectiveMinIsr(String topicName) { + String minIsrConfig = configurationControl.getTopicConfig(topicName, MIN_IN_SYNC_REPLICAS_CONFIG); + Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : Integer.parseInt(minIsrConfig); + Uuid topicId = topicsByName.get(topicName); + Integer replicationFactor = topics.get(topicId).parts.get(0).replicas.length; Review Comment: It doesn't hurt to be safe. Even a RuntimeException is nicer than an NPE. Related to this change, can we have this method return `int` instead of `Integer`? Doesn't look like this can return `null` anyways. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org