mumrah commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362167841


##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +353,37 @@ public Optional<ApiMessageAndVersion> build() {
 
         completeReassignmentIfNeeded();
 
+        if (PartitionChangeBuilder.this.eligibleLeaderReplicasEnabled) {
+            populateTargetElr();
+        }
+
         tryElection(record);
 
         triggerLeaderEpochBumpIfNeeded(record);
 
-        if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-            // Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-            record.setIsr(targetIsr);
+        // During the leader election, it can set the record isr if an unclean 
leader election happens.

Review Comment:
   I guess what I don't like is the logic split between `populateTargetElr` and 
the lines below. For example, in populateTargetElr we always set targetElr and 
targetLastKnownElr, but in some cases we immediately clear those collections. 
It would be nice if all the ELR logic was inside populateTargetElr.
   E.g., 
   
   ```
   if (eligibleLeaderReplicasEnabled) {
       populateTargetElr(); // compute the ELR fields and set them on the record
   } else {
       //  set the ELR fields to empty (or whatever defaults)
   }
   ```
   
   It might actually be more fitting with the code in this class to have a 
method like `maybePopulateTargetElr` that is unconditionally called and put the 
all the ELR logic there.
   
   E.g., 
   
   ```
     completeReassignmentIfNeeded();
   
     tryElection(record);
   
     triggerLeaderEpochBumpIfNeeded(record);
   
     maybePopulateTargetElr(record);
   ```



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -434,6 +434,14 @@ Map<String, String> getConfigs(ConfigResource 
configResource) {
         }
     }
 
+    String getTopicConfig(String topicName, String configKey) {

Review Comment:
   Let's add some javadoc to this explaining the behavior (like the default 
empty string)



##########
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##########
@@ -17,7 +17,9 @@
   "apiKey": 5,
   "type": "metadata",
   "name": "PartitionChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   If we're just adding tagged fields, no we don't need the version bump. If we 
consider the downgrade case where someone decreases the MV below 3.7-IV1, what 
we (will eventually) do is take a snapshot of the current metadata log at the 
older MV which can be a lossy operation. Since tagged fields are generic and 
optional, there's no problem re-writing a record to _not_ include the tagged 
fields. 
   
   Actually, as a side note, PartitionChangeRecord isn't even included in the 
snapshot (only PartitionRecord is), so it doesn't affect downgrades at all.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1995,6 +2015,19 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
             setReplicas(Replicas.toList(partition.replicas)));
     }
 
+    // Visible to test.
+    Integer getTopicEffectiveMinIsr(String topicName) {
+        String minIsrConfig = configurationControl.getTopicConfig(topicName, 
MIN_IN_SYNC_REPLICAS_CONFIG);
+        Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : 
Integer.parseInt(minIsrConfig);
+        Uuid topicId = topicsByName.get(topicName);
+        Integer replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;

Review Comment:
   It doesn't hurt to be safe. Even a RuntimeException is nicer than an NPE.
   
   Related to this change, can we have this method return `int` instead of 
`Integer`? Doesn't look like this can return `null` anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to