Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-19 Thread via GitHub


mumrah merged PR #14312:
URL: https://github.com/apache/kafka/pull/14312


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-18 Thread via GitHub


CalvinConfluent commented on PR #14312:
URL: https://github.com/apache/kafka/pull/14312#issuecomment-1769582360

   No relevant UT failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362959182


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -186,7 +186,10 @@ public enum MetadataVersion {
 IBP_3_6_IV2(14, "3.6", "IV2", true),
 
 // Implement KIP-919 controller registration.
-IBP_3_7_IV0(15, "3.7", "IV0", true);
+IBP_3_7_IV0(15, "3.7", "IV0", true),
+
+// Add ELR related supports (KIP-966).
+IBP_3_7_IV1(16, "3.7", "IV1", true);

Review Comment:
   We do not need a second MV bump so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


cmccabe commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362935412


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -186,7 +186,10 @@ public enum MetadataVersion {
 IBP_3_6_IV2(14, "3.6", "IV2", true),
 
 // Implement KIP-919 controller registration.
-IBP_3_7_IV0(15, "3.7", "IV0", true);
+IBP_3_7_IV0(15, "3.7", "IV0", true),
+
+// Add ELR related supports (KIP-966).
+IBP_3_7_IV1(16, "3.7", "IV1", true);

Review Comment:
   > Wouldn't having this prevent version bumps until KIP-966 is fully 
implemented 
   
   Good question, but no, it shouldn't
   
   One thing to note, though: if we need a second MV bump before this feature 
fully works, we can point that out too (and/or enforce it somehow). Do you 
think this will be needed @CalvinConfluent ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


cmccabe commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362934680


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -434,6 +434,14 @@ Map getConfigs(ConfigResource 
configResource) {
 }
 }
 
+String getTopicConfig(String topicName, String configKey) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362922279


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -362,6 +393,58 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
 }
 }
 
+private void maybeUpdateRecordElr(PartitionChangeRecord record) {
+// During the leader election, it can set the record isr if an unclean 
leader election happens.
+boolean isCleanLeaderElection = record.isr() == null;
+
+// Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+if (!isCleanLeaderElection || !eligibleLeaderReplicasEnabled) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+}
+
+if (!targetElr.equals(Replicas.toList(partition.elr))) {
+record.setEligibleLeaderReplicas(targetElr);
+}
+
+if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+record.setLastKnownELR(targetLastKnownElr);
+}
+}
+
+private void maybePopulateTargetElr() {
+if (!eligibleLeaderReplicasEnabled) return;
+
+// If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+if (targetIsr.size() >= minISR) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+return;
+}
+
+Set targetIsrSet = new HashSet<>(targetIsr);
+// Tracking the ELR. The new elr is expected to
+// 1. Include the current ISR
+// 2. Exclude the duplicate replicas between elr and target ISR.
+// 3. Exclude unclean shutdown replicas.
+// To do that, we first union the current ISR and current elr, then 
filter out the target ISR and unclean shutdown
+// Replicas.
+Set candidateSet = new HashSet<>(targetElr);
+Arrays.stream(partition.isr).forEach(ii -> candidateSet.add(ii));
+targetElr = candidateSet.stream()
+.filter(replica -> !targetIsrSet.contains(replica))
+.filter(replica -> uncleanShutdownReplicas == null || 
!uncleanShutdownReplicas.contains(replica))
+.collect(Collectors.toList());
+
+// Calculate the new last known ELR. Includes any ISR members since 
the ISR size drops below min ISR.
+// In order to reduce the metadata usage, the last known ELR excludes 
the members in ELR and current ISR.
+targetLastKnownElr.forEach(ii -> candidateSet.add(ii));

Review Comment:
   Had an offline discussion. 
   Before the Unclean recovery is in place, we can just skip setting the 
lastKnownElr with the real last known ELR but storing the last known leader.
   Removed the lastKnownLeader field. In order to save the progress, in the 
following leader election change, I will add a flag only for testing that 
switches between storing last known leader and last known ELR in the 
lastKnownElr field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


artemlivshits commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362802596


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -362,6 +393,58 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
 }
 }
 
+private void maybeUpdateRecordElr(PartitionChangeRecord record) {
+// During the leader election, it can set the record isr if an unclean 
leader election happens.
+boolean isCleanLeaderElection = record.isr() == null;
+
+// Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+if (!isCleanLeaderElection || !eligibleLeaderReplicasEnabled) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+}
+
+if (!targetElr.equals(Replicas.toList(partition.elr))) {
+record.setEligibleLeaderReplicas(targetElr);
+}
+
+if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+record.setLastKnownELR(targetLastKnownElr);
+}
+}
+
+private void maybePopulateTargetElr() {
+if (!eligibleLeaderReplicasEnabled) return;
+
+// If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+if (targetIsr.size() >= minISR) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+return;
+}
+
+Set targetIsrSet = new HashSet<>(targetIsr);
+// Tracking the ELR. The new elr is expected to
+// 1. Include the current ISR
+// 2. Exclude the duplicate replicas between elr and target ISR.
+// 3. Exclude unclean shutdown replicas.
+// To do that, we first union the current ISR and current elr, then 
filter out the target ISR and unclean shutdown
+// Replicas.
+Set candidateSet = new HashSet<>(targetElr);
+Arrays.stream(partition.isr).forEach(ii -> candidateSet.add(ii));
+targetElr = candidateSet.stream()
+.filter(replica -> !targetIsrSet.contains(replica))
+.filter(replica -> uncleanShutdownReplicas == null || 
!uncleanShutdownReplicas.contains(replica))
+.collect(Collectors.toList());
+
+// Calculate the new last known ELR. Includes any ISR members since 
the ISR size drops below min ISR.
+// In order to reduce the metadata usage, the last known ELR excludes 
the members in ELR and current ISR.
+targetLastKnownElr.forEach(ii -> candidateSet.add(ii));

Review Comment:
   Can we populate the last known leader as the first member of last known ELR? 
 Basically, when the last replica is removed from ISR, set it as the first 
member of the last known ELR.  This could result in cases when a replica is 
both in ELR and the last known ELR, but this wouldn't be worse than with the 
current change because then we wouldn't need to have the last known leader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362774389


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -434,6 +434,14 @@ Map getConfigs(ConfigResource 
configResource) {
 }
 }
 
+String getTopicConfig(String topicName, String configKey) {

Review Comment:
   - Now using the computeEffectiveTopicConfigs. getEffectiveTopicConfigValue 
is not in AK yet.
   - return null if the config is not found.
   - It will be a separate change to handle the ELR change if minISR is 
dynamically updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


artemlivshits commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362773250


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -362,6 +405,35 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
 }
 }
 
+private void populateTargetElr() {
+// If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+if (targetIsr.size() >= minISR) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+return;
+}
+

Review Comment:
   It's probably an outdated comment -- there was an extra like that you've 
probably fixed in a later commit,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362759943


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -244,6 +251,11 @@ boolean check() {
  */
 private final boolean zkMigrationEnabled;
 
+/**
+ * True if eligible leader replicas is enabled.
+ */
+private final boolean eligibleLeaderReplicasEnabled;

Review Comment:
   Sure, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


cmccabe commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362559786


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -434,6 +434,14 @@ Map getConfigs(ConfigResource 
configResource) {
 }
 }
 
+String getTopicConfig(String topicName, String configKey) {

Review Comment:
   This is not correct. If you want to know what the topic configuration is, 
you have to use `getEffectiveTopicConfigValue`. This will handle things like 
the N levels of config overrides.
   
   I also don't agree with mapping "doesn't exist" to "empty string" (we don't 
do that anywhere else in the code)
   
   Finally, you'll need a strategy for handling changes to these dynamic 
configurations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362557831


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1995,6 +2015,19 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
 setReplicas(Replicas.toList(partition.replicas)));
 }
 
+// Visible to test.
+Integer getTopicEffectiveMinIsr(String topicName) {
+String minIsrConfig = configurationControl.getTopicConfig(topicName, 
MIN_IN_SYNC_REPLICAS_CONFIG);
+Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : 
Integer.parseInt(minIsrConfig);
+Uuid topicId = topicsByName.get(topicName);
+Integer replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


cmccabe commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362559009


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -244,6 +251,11 @@ boolean check() {
  */
 private final boolean zkMigrationEnabled;
 
+/**
+ * True if eligible leader replicas is enabled.
+ */
+private final boolean eligibleLeaderReplicasEnabled;

Review Comment:
   I don't see why this is in ClusterControlManager. It's not related to 
cluster membership -- seems related to replication, so it should go in 
ReplicationControlManager ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362558817


##
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##
@@ -17,7 +17,9 @@
   "apiKey": 5,
   "type": "metadata",
   "name": "PartitionChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   Skipped the version bump.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362556895


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -328,13 +353,37 @@ public Optional build() {
 
 completeReassignmentIfNeeded();
 
+if (PartitionChangeBuilder.this.eligibleLeaderReplicasEnabled) {
+populateTargetElr();
+}
+
 tryElection(record);
 
 triggerLeaderEpochBumpIfNeeded(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-// Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-record.setIsr(targetIsr);
+// During the leader election, it can set the record isr if an unclean 
leader election happens.

Review Comment:
   As discussed, split the logic to two methods, calling one of them before and 
after the election.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-17 Thread via GitHub


mumrah commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362167841


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -328,13 +353,37 @@ public Optional build() {
 
 completeReassignmentIfNeeded();
 
+if (PartitionChangeBuilder.this.eligibleLeaderReplicasEnabled) {
+populateTargetElr();
+}
+
 tryElection(record);
 
 triggerLeaderEpochBumpIfNeeded(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-// Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-record.setIsr(targetIsr);
+// During the leader election, it can set the record isr if an unclean 
leader election happens.

Review Comment:
   I guess what I don't like is the logic split between `populateTargetElr` and 
the lines below. For example, in populateTargetElr we always set targetElr and 
targetLastKnownElr, but in some cases we immediately clear those collections. 
It would be nice if all the ELR logic was inside populateTargetElr.
   E.g., 
   
   ```
   if (eligibleLeaderReplicasEnabled) {
   populateTargetElr(); // compute the ELR fields and set them on the record
   } else {
   //  set the ELR fields to empty (or whatever defaults)
   }
   ```
   
   It might actually be more fitting with the code in this class to have a 
method like `maybePopulateTargetElr` that is unconditionally called and put the 
all the ELR logic there.
   
   E.g., 
   
   ```
 completeReassignmentIfNeeded();
   
 tryElection(record);
   
 triggerLeaderEpochBumpIfNeeded(record);
   
 maybePopulateTargetElr(record);
   ```



##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -434,6 +434,14 @@ Map getConfigs(ConfigResource 
configResource) {
 }
 }
 
+String getTopicConfig(String topicName, String configKey) {

Review Comment:
   Let's add some javadoc to this explaining the behavior (like the default 
empty string)



##
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##
@@ -17,7 +17,9 @@
   "apiKey": 5,
   "type": "metadata",
   "name": "PartitionChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   If we're just adding tagged fields, no we don't need the version bump. If we 
consider the downgrade case where someone decreases the MV below 3.7-IV1, what 
we (will eventually) do is take a snapshot of the current metadata log at the 
older MV which can be a lossy operation. Since tagged fields are generic and 
optional, there's no problem re-writing a record to _not_ include the tagged 
fields. 
   
   Actually, as a side note, PartitionChangeRecord isn't even included in the 
snapshot (only PartitionRecord is), so it doesn't affect downgrades at all.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1995,6 +2015,19 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
 setReplicas(Replicas.toList(partition.replicas)));
 }
 
+// Visible to test.
+Integer getTopicEffectiveMinIsr(String topicName) {
+String minIsrConfig = configurationControl.getTopicConfig(topicName, 
MIN_IN_SYNC_REPLICAS_CONFIG);
+Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : 
Integer.parseInt(minIsrConfig);
+Uuid topicId = topicsByName.get(topicName);
+Integer replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;

Review Comment:
   It doesn't hurt to be safe. Even a RuntimeException is nicer than an NPE.
   
   Related to this change, can we have this method return `int` instead of 
`Integer`? Doesn't look like this can return `null` anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-16 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1361389729


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1995,6 +2015,19 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
 setReplicas(Replicas.toList(partition.replicas)));
 }
 
+// Visible to test.
+Integer getTopicEffectiveMinIsr(String topicName) {
+String minIsrConfig = configurationControl.getTopicConfig(topicName, 
MIN_IN_SYNC_REPLICAS_CONFIG);
+Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : 
Integer.parseInt(minIsrConfig);
+Uuid topicId = topicsByName.get(topicName);
+Integer replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;

Review Comment:
   Yeah, from the current use case, the caller side has validated the topic. 
However, I think we can be paranoid here to have a try-catch to avoid mistakes 
as there are a lot of places can hit null. 
   @mumrah You commented on this before, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-16 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1361390777


##
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##
@@ -17,7 +17,9 @@
   "apiKey": 5,
   "type": "metadata",
   "name": "PartitionChangeRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   Good question. Not super sure about this. @mumrah 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-16 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1361390411


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -186,7 +186,10 @@ public enum MetadataVersion {
 IBP_3_6_IV2(14, "3.6", "IV2", true),
 
 // Implement KIP-919 controller registration.
-IBP_3_7_IV0(15, "3.7", "IV0", true);
+IBP_3_7_IV0(15, "3.7", "IV0", true),
+
+// Add ELR related supports (KIP-966).
+IBP_3_7_IV1(16, "3.7", "IV1", true);

Review Comment:
   We also need to bump the PartitionRecord version which requires a MV bump. 
So we have to add a new MV here. But we also have the 
eligible.leader.replicas.enable=false by default, so the new MV will not block 
other MVs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-16 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1361389729


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1995,6 +2015,19 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
 setReplicas(Replicas.toList(partition.replicas)));
 }
 
+// Visible to test.
+Integer getTopicEffectiveMinIsr(String topicName) {
+String minIsrConfig = configurationControl.getTopicConfig(topicName, 
MIN_IN_SYNC_REPLICAS_CONFIG);
+Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : 
Integer.parseInt(minIsrConfig);
+Uuid topicId = topicsByName.get(topicName);
+Integer replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;

Review Comment:
   Yeah, from the current use case, all the get will have valid values. 
However, I think we can be paranoid here to have a try-catch to avoid mistakes 
as there are a lot of places can hit null. 
   @mumrah You commented on this before, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-16 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1361388641


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -362,6 +405,35 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
 }
 }
 
+private void populateTargetElr() {
+// If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+if (targetIsr.size() >= minISR) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+return;
+}
+

Review Comment:
   Sorry, where to add a line? Before return?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-16 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1361388430


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -328,13 +346,38 @@ public Optional build() {
 
 completeReassignmentIfNeeded();
 
+boolean isElrEnabled = metadataVersion.isElrSupported();
+if (isElrEnabled) {
+populateTargetElr();
+}
+
 tryElection(record);
 
 triggerLeaderEpochBumpIfNeeded(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-// Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-record.setIsr(targetIsr);
+// During the leader election, it can set the record isr if an unclean 
leader election happens.
+boolean isCleanLeaderElection = record.isr() == null;
+
+// Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+if (!isCleanLeaderElection || !isElrEnabled) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+}
+
+if (!targetElr.equals(Replicas.toList(partition.elr))) {
+record.setEligibleLeaderReplicas(targetElr);
+}
+if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+record.setLastKnownELR(targetLastKnownElr);
+}
+
+// The record.isr is null if it is a clean election. In this case, it 
will
+// 1. Set the new ISR if it is different from the current ISR.
+// 2. Set the new ELR/LastKnowElr if it is different from the current 
ones.
+if (isCleanLeaderElection) {

Review Comment:
   The file has been updated, and this part has been removed. 
   Yeah, it may be straightforward to call it "isIsrSet". But it is more 
comprehensive to translate to isCleanLeaderElection because we also need to 
reset the ELR when unclean leader election happens(see the latest update). it 
may not be clear to connect why ELR has to be cleaned if record.isr() is set.



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -328,13 +346,38 @@ public Optional build() {
 
 completeReassignmentIfNeeded();
 
+boolean isElrEnabled = metadataVersion.isElrSupported();
+if (isElrEnabled) {
+populateTargetElr();
+}
+
 tryElection(record);
 
 triggerLeaderEpochBumpIfNeeded(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-// Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-record.setIsr(targetIsr);
+// During the leader election, it can set the record isr if an unclean 
leader election happens.
+boolean isCleanLeaderElection = record.isr() == null;
+
+// Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+if (!isCleanLeaderElection || !isElrEnabled) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+}
+
+if (!targetElr.equals(Replicas.toList(partition.elr))) {
+record.setEligibleLeaderReplicas(targetElr);
+}
+if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+record.setLastKnownELR(targetLastKnownElr);
+}
+
+// The record.isr is null if it is a clean election. In this case, it 
will
+// 1. Set the new ISR if it is different from the current ISR.
+// 2. Set the new ELR/LastKnowElr if it is different from the current 
ones.

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-16 Thread via GitHub


artemlivshits commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1353913582


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -328,13 +346,38 @@ public Optional build() {
 
 completeReassignmentIfNeeded();
 
+boolean isElrEnabled = metadataVersion.isElrSupported();
+if (isElrEnabled) {
+populateTargetElr();
+}
+
 tryElection(record);
 
 triggerLeaderEpochBumpIfNeeded(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-// Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-record.setIsr(targetIsr);
+// During the leader election, it can set the record isr if an unclean 
leader election happens.
+boolean isCleanLeaderElection = record.isr() == null;
+
+// Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+if (!isCleanLeaderElection || !isElrEnabled) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+}
+
+if (!targetElr.equals(Replicas.toList(partition.elr))) {
+record.setEligibleLeaderReplicas(targetElr);
+}
+if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+record.setLastKnownELR(targetLastKnownElr);
+}
+
+// The record.isr is null if it is a clean election. In this case, it 
will
+// 1. Set the new ISR if it is different from the current ISR.
+// 2. Set the new ELR/LastKnowElr if it is different from the current 
ones.
+if (isCleanLeaderElection) {

Review Comment:
   Looks like the logic here is "set ISR if it wasn't set", so checking `if 
(record.isr() == null)` seems more intentional here.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1995,6 +2015,19 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
 setReplicas(Replicas.toList(partition.replicas)));
 }
 
+// Visible to test.
+Integer getTopicEffectiveMinIsr(String topicName) {
+String minIsrConfig = configurationControl.getTopicConfig(topicName, 
MIN_IN_SYNC_REPLICAS_CONFIG);
+Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr : 
Integer.parseInt(minIsrConfig);
+Uuid topicId = topicsByName.get(topicName);
+Integer replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;

Review Comment:
   Is it guaranteed that we have elements in all maps here?  We don't check any 
nulls.



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -186,7 +186,10 @@ public enum MetadataVersion {
 IBP_3_6_IV2(14, "3.6", "IV2", true),
 
 // Implement KIP-919 controller registration.
-IBP_3_7_IV0(15, "3.7", "IV0", true);
+IBP_3_7_IV0(15, "3.7", "IV0", true),
+
+// Add ELR related supports (KIP-966).
+IBP_3_7_IV1(16, "3.7", "IV1", true);

Review Comment:
   Wouldn't having this prevent version bumps until KIP-966 is fully 
implemented?  Would it be enough to just have `eligible.leader.replicas.enable` 
config that would be used for testing, and then when the feature is ready to be 
release, flip `eligible.leader.replicas.enable` to `true` and add a new 
metadata version.



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -328,13 +346,38 @@ public Optional build() {
 
 completeReassignmentIfNeeded();
 
+boolean isElrEnabled = metadataVersion.isElrSupported();
+if (isElrEnabled) {
+populateTargetElr();
+}
+
 tryElection(record);
 
 triggerLeaderEpochBumpIfNeeded(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-// Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-record.setIsr(targetIsr);
+// During the leader election, it can set the record isr if an unclean 
leader election happens.
+boolean isCleanLeaderElection = record.isr() == null;
+
+// Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+if (!isCleanLeaderElection || !isElrEnabled) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+}
+
+if (!targetElr.equals(Replicas.toList(partition.elr))) {
+record.setEligibleLeaderReplicas(targetElr);
+}
+if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+record.setLastKnownELR(targetLastKnownElr);
+

Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-12 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1357255199


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -328,13 +353,37 @@ public Optional build() {
 
 completeReassignmentIfNeeded();
 
+if (PartitionChangeBuilder.this.eligibleLeaderReplicasEnabled) {
+populateTargetElr();
+}
+
 tryElection(record);
 
 triggerLeaderEpochBumpIfNeeded(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-// Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-record.setIsr(targetIsr);
+// During the leader election, it can set the record isr if an unclean 
leader election happens.

Review Comment:
   This PR mostly just manages the ELR fields, no election behavior changes. I 
will try to make the leader election in separate paths with/without ELR in the 
following leader election PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-12 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1357250930


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1994,6 +2014,24 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
 setReplicas(Replicas.toList(partition.replicas)));
 }
 
+// Make it public to be visible in test.
+public Integer getTopicEffectiveMinIsr(String topicName) {
+Integer currentMinIsr = 
Integer.parseInt(configurationControl.getTopicConfigs(topicName)
+.getOrDefault(MIN_IN_SYNC_REPLICAS_CONFIG, 
String.valueOf(defaultMinInSyncIsr)));
+Integer replicationFactor = (int) defaultReplicationFactor;
+try {
+Uuid topicId = topicsByName.get(topicName);
+replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;
+} catch (Exception e) {
+log.debug("Can't find the replication factor for topic: " + 
topicName);
+}
+return Math.min(currentMinIsr, replicationFactor);
+}
+
+private short partitionRecordVersion(boolean isElrAllowed) {

Review Comment:
   Removed, using the MV to determine the record version.
   Even if the ELR static config is disabled, the partition records should be 
able to use the latest version in order not to block future features.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-12 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1357251569


##
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##
@@ -40,6 +42,15 @@
   "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", 
"tag": 4,
   "about": "null if the adding replicas didn't change; the new adding 
replicas otherwise." },
 { "name": "LeaderRecoveryState", "type": "int8", "default": "-1", 
"versions": "0+", "taggedVersions": "0+", "tag": 5,
-  "about": "-1 if it didn't change; 0 if the leader was elected from the 
ISR or recovered from an unclean election; 1 if the leader that was elected 
using unclean leader election and it is still recovering." }
+  "about": "-1 if it didn't change; 0 if the leader was elected from the 
ISR or recovered from an unclean election; 1 if the leader that was elected 
using unclean leader election and it is still recovering." },
+{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
+  "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 6,
+  "about": "null if the ELR didn't change; the new eligible leader 
replicas otherwise." },
+{ "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
+  "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 7,
+  "about": "null if the LastKnownELR didn't change; the last known 
eligible leader replicas otherwise." },
+{ "name": "LastKnownLeader", "type": "int32", "default": "-2", 
"entityType": "brokerId",

Review Comment:
   Synced offline, make the lastKnownLeader a tagged field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-12 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1357248280


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1994,6 +2014,24 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
 setReplicas(Replicas.toList(partition.replicas)));
 }
 
+// Make it public to be visible in test.

Review Comment:
   Cool, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-12 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1357247099


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -79,29 +84,39 @@ public enum Election {
 private List targetReplicas;
 private List targetRemoving;
 private List targetAdding;
+private List targetElr;
+private List targetLastKnownElr;
+private List uncleanShutdownReplicas;
 private Election election = Election.ONLINE;
 private LeaderRecoveryState targetLeaderRecoveryState;
 private boolean zkMigrationEnabled;
+private boolean eligibleLeaderReplicasEnabled;
+private int minISR;
 
 
 public PartitionChangeBuilder(
 PartitionRegistration partition,
 Uuid topicId,
 int partitionId,
 IntPredicate isAcceptableLeader,
-MetadataVersion metadataVersion
+MetadataVersion metadataVersion,
+int minISR
 ) {
 this.partition = partition;
 this.topicId = topicId;
 this.partitionId = partitionId;
 this.isAcceptableLeader = isAcceptableLeader;
 this.metadataVersion = metadataVersion;
 this.zkMigrationEnabled = false;
+this.eligibleLeaderReplicasEnabled = false;
+this.minISR = minISR;
 
 this.targetIsr = Replicas.toList(partition.isr);
 this.targetReplicas = Replicas.toList(partition.replicas);
 this.targetRemoving = Replicas.toList(partition.removingReplicas);
 this.targetAdding = Replicas.toList(partition.addingReplicas);
+this.targetElr = Collections.emptyList();
+this.targetLastKnownElr = Collections.emptyList();

Review Comment:
   Sure, we can do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-12 Thread via GitHub


CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1357244420


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -434,6 +434,15 @@ Map getConfigs(ConfigResource 
configResource) {
 }
 }
 
+Map getTopicConfigs(String topicName) {

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15581: Introduce ELR [kafka]

2023-10-12 Thread via GitHub


mumrah commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1349018067


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -203,6 +203,7 @@ static public class Builder {
 private QuorumFeatures quorumFeatures = null;
 private short defaultReplicationFactor = 3;
 private int defaultNumPartitions = 1;
+private int defaultMinInSyncIsr = 1;

Review Comment:
   nit: this name is redundant. ISR == InSyncReplica, so i think this should 
probably be `defaultMinIsr` 



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -328,13 +353,37 @@ public Optional build() {
 
 completeReassignmentIfNeeded();
 
+if (PartitionChangeBuilder.this.eligibleLeaderReplicasEnabled) {
+populateTargetElr();
+}
+
 tryElection(record);
 
 triggerLeaderEpochBumpIfNeeded(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-// Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-record.setIsr(targetIsr);
+// During the leader election, it can set the record isr if an unclean 
leader election happens.

Review Comment:
   Could we separate the ELR election code into a separate method? Then we can 
have one branch point where we decided if we need to perform ELR election vs 
the old code. This would also let us keep the old code unmodified which has 
some merit.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -149,6 +150,8 @@ static class Builder {
 private LogContext logContext = null;
 private short defaultReplicationFactor = (short) 3;
 private int defaultNumPartitions = 1;
+
+private int defaultMinInSyncIsr = 1;

Review Comment:
   same naming nit as above



##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -434,6 +434,15 @@ Map getConfigs(ConfigResource 
configResource) {
 }
 }
 
+Map getTopicConfigs(String topicName) {

Review Comment:
   We should avoid copying data structures when we can. It looks like the 
callers of this method are reading an individual key anyways, so maybe `String 
getTopicConfig(String topicName, String configKey)` would work?



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1008,9 +1024,11 @@ ControllerResult 
alterPartition(
 topic.id,
 partitionId,
 clusterControl::isActive,
-featureControl.metadataVersion()
+featureControl.metadataVersion(),
+getTopicEffectiveMinIsr(topic.name)
 );
-
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+
.setEligibleLeaderReplicasEnabled(clusterControl.eligibleLeaderReplicasAllowed());

Review Comment:
   nit: just do another `builder.` line here



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -362,6 +411,35 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
 }
 }
 
+private void populateTargetElr() {
+// If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+if (targetIsr.size() >= minISR) {
+targetElr = Collections.emptyList();
+targetLastKnownElr = Collections.emptyList();
+return;
+}
+
+
+Set targetIsrSet = 
targetIsr.stream().collect(Collectors.toSet());
+// Tracking the ELR. The new elr is expected to
+// 1. Include the current ISR
+// 2. Exclude the duplicate replicas between elr and target ISR.
+// 3. Exclude unclean shutdown replicas.
+// To do that, we first union the current ISR and current elr, then 
filter out the target ISR and unclean shutdown
+// Replicas.
+Set candidateSet = 
Arrays.stream(partition.elr).boxed().collect(Collectors.toSet());
+Arrays.stream(partition.isr).boxed().forEach(ii -> 
candidateSet.add(ii));
+targetElr = candidateSet.stream()
+.filter(replica -> !targetIsrSet.contains(replica) && 
(uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica)))

Review Comment:
   Can we split this into two filters for better readability?



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1396,9 +1414,12 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
 topicId,