Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-07-10 Thread via GitHub


mumrah commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1672784230


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2225,9 +2250,14 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
 }
 
 // Visible to test.
-int getTopicEffectiveMinIsr(String topicName) {
+int getTopicEffectiveMinIsr(String topicName, Function 
getTopicMinIsrConfig) {
 int currentMinIsr = defaultMinIsr;
-String minIsrConfig = configurationControl.getTopicConfig(topicName, 
MIN_IN_SYNC_REPLICAS_CONFIG);
+String minIsrConfig;
+if (getTopicMinIsrConfig == null) {

Review Comment:
   Can we split the method into two signatures instead of having the null?
   
   E.g., `int getTopicEffectiveMinIsr(String topicName)` and `int 
getTopicEffectiveMinIsr(String topicName, Function 
getTopicMinIsrConfig)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-07-10 Thread via GitHub


mumrah commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1672730022


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -500,9 +631,26 @@ boolean uncleanLeaderElectionEnabledForTopic(String name) {
 return false; // TODO: support configuring unclean leader election.
 }
 
+Map computeEffectiveTopicConfigsWithPendingChange(
+Map> pendingConfigData
+) {
+Map pendingClusterConfig =
+pendingConfigData.containsKey(DEFAULT_NODE) ? 
pendingConfigData.get(DEFAULT_NODE) : Collections.emptyMap();
+Map pendingControllerConfig =
+pendingConfigData.containsKey(currentController) ? 
pendingConfigData.get(currentController) : Collections.emptyMap();
+return configSchema.resolveEffectiveTopicConfigs(
+new OrderedConfigResolver(staticConfig),

Review Comment:
   Hm.. what happens if the controller has different static configs from the 
broker?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-07-10 Thread via GitHub


mumrah commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1671034456


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicToMinIsrValueMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() != null) 
topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicToMinIsrValueMap.size() == minIsrRecords.size()) {

Review Comment:
   The size comparison here and below are a little non-obvious (to me at 
least). Maybe we can set a boolean as we're looping through the records to 
determine if we hit this branch.
   
   Alternative question, is this optimization helping with performance? We 
still need the code for the case of overlaying configs from different levels, 
so having this separate code path just increases complexity. 



##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {

Review Comment:
   The complexity here is a tad bit high. Can we extract a method for getting 
the minIsrRecords ConfigRecord from `List`?



##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -426,6 +497,35 @@ public void replay(ConfigRecord record) {
 }
 }
 
+/**
+ * Apply a configuration record to the given config data. Note that, it 
will store null for the config to be removed.
+ *
+ * @param record The ConfigRecord.
+ * @param localConfigDataThe config data is going to be updated.
+ */
+public void replayForPendingConfig(

Review Comment:
   Can we reuse ConfigurationsImage here instead of adding another place where 
we are applying records? I think it should be reasonably straightforward to 
construct a ConfigurationsImage with the in-memory state (`localConfigData`) 
and then replay records to get a ConfigurationsDelta.



##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicToMinIsrValueMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() != null) 
topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());

Review Comment:
   style nit: Can you reformat these to not be inline?



##
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##
@@ -244,4 +245,34 @@ private ConfigEntry toConfigEntry(ConfigDef.ConfigKey 
configKey,
 translateConfigType(configKey.type()),
 configKey.documentation);
 }
+
+/**
+ * OrderedConfigResolver helps to find the configs in the order of the 
list config maps.
+ * One thing to notice that, when calling containsKey, if a config 
contains a null value entry,
+ * it will return false as null value means the config value should be 
ignored.
+ **/
+public 

Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-05-27 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1616598075


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -443,9 +525,33 @@ Map getConfigs(ConfigResource 
configResource) {
  * @param configKeyThe key for the config.
  */
 String getTopicConfig(String topicName, String configKey) throws 
NoSuchElementException {
-Map map = configData.get(new 
ConfigResource(Type.TOPIC, topicName));
+return getTopicConfigWithPendingChange(topicName, configKey, 
configData);
+}
+
+/**
+ * Get the config value for the give topic and give config key. Also, it 
will search the configs in the pending
+ * config data first.
+ * If the config value is not found, return null.
+ *
+ * @param topicNameThe topic name for the config.
+ * @param configKeyThe key for the config.
+ * @param pendingConfigDataThe configs which is going to be applied. 
It should have the higher priority than
+ * the current configs.
+ */
+String getTopicConfigWithPendingChange(
+String topicName,
+String configKey,
+Map> pendingConfigData

Review Comment:
   I used a `OrderedConfigResolver` later to solve the problem that we have to 
look up both pending config data and current config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-05-27 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1614174475


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicMap.size() == minIsrRecords.size()) {
+// If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+// updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+new ArrayList<>(topicMap.keySet()),
+topicName -> topicMap.get(topicName))
+);
+return;
+}
+
+// Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+// the config updates to it. Use this config copy for the min ISR look 
up.
+Map> pendingConfigData 
= new HashMap<>();
+SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new 
LogContext("dummy-config-update"));

Review Comment:
   However, you just remind me that this does not handle config removal 
correctly. Addressing the removal will make the change more complicated. Maybe 
we should consider the previous approach which makes a copy of the current 
configs(min ISR part) and apply the updates to the copy. 
   @cmccabe any comments here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-05-24 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1614174475


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicMap.size() == minIsrRecords.size()) {
+// If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+// updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+new ArrayList<>(topicMap.keySet()),
+topicName -> topicMap.get(topicName))
+);
+return;
+}
+
+// Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+// the config updates to it. Use this config copy for the min ISR look 
up.
+Map> pendingConfigData 
= new HashMap<>();
+SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new 
LogContext("dummy-config-update"));

Review Comment:
   However, you just remind me that this does not handle config removal 
correctly. Addressing the removal will make the change more complicated. Maybe 
we should consider the previous approach which makes a copy of the current 
configs(min ISR part) and apply the updates to the copy. 
   @cmccabe any comments here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-05-24 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1614169536


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicMap.size() == minIsrRecords.size()) {
+// If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+// updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+new ArrayList<>(topicMap.keySet()),
+topicName -> topicMap.get(topicName))
+);
+return;
+}
+
+// Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+// the config updates to it. Use this config copy for the min ISR look 
up.
+Map> pendingConfigData 
= new HashMap<>();
+SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new 
LogContext("dummy-config-update"));

Review Comment:
   Yeah, it is not a straightforward change. So the pending config data 
populated here will be checked together with the existing configs. See how 
`OrderedConfigResolver` is used below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-05-24 Thread via GitHub


cmccabe commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1614159525


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});

Review Comment:
   Yeah, unfortunately we have a 5 level system:
   
   1. topic configs (highest priority)
   2. node configs (aka "broker" configs, but they also apply to controllers)
   3. cluster configs (aka default configs for the broker resource)
   4. static configuration
   5. static default
   
   The first three levels can change at runtime ядо 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-05-24 Thread via GitHub


cmccabe commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1614158684


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {

Review Comment:
   I guess eventually we will probably want some more general system for 
responding to configuration changes, and not special-case min.isr. However, we 
don't have to do that in this PR.
   
   I also understand why you had this function receive a list of records here 
rather than something fancier (easier to integrate into the existing code, and 
into the two configuration change paths.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-05-24 Thread via GitHub


cmccabe commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1614157582


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -443,9 +525,33 @@ Map getConfigs(ConfigResource 
configResource) {
  * @param configKeyThe key for the config.
  */
 String getTopicConfig(String topicName, String configKey) throws 
NoSuchElementException {
-Map map = configData.get(new 
ConfigResource(Type.TOPIC, topicName));
+return getTopicConfigWithPendingChange(topicName, configKey, 
configData);
+}
+
+/**
+ * Get the config value for the give topic and give config key. Also, it 
will search the configs in the pending
+ * config data first.
+ * If the config value is not found, return null.
+ *
+ * @param topicNameThe topic name for the config.
+ * @param configKeyThe key for the config.
+ * @param pendingConfigDataThe configs which is going to be applied. 
It should have the higher priority than
+ * the current configs.
+ */
+String getTopicConfigWithPendingChange(
+String topicName,
+String configKey,
+Map> pendingConfigData

Review Comment:
   It seems like there could be some value in turning the pending config change 
into a class of its own, if we're going to be querying it like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-05-24 Thread via GitHub


cmccabe commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1614156696


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicMap.size() == minIsrRecords.size()) {
+// If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+// updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+new ArrayList<>(topicMap.keySet()),
+topicName -> topicMap.get(topicName))
+);
+return;
+}
+
+// Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+// the config updates to it. Use this config copy for the min ISR look 
up.
+Map> pendingConfigData 
= new HashMap<>();
+SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new 
LogContext("dummy-config-update"));

Review Comment:
   Maybe I'm missing something, but I don't think this works... 
`pendingConfigData` will have some of the new changes you made, but not all of 
the existing changes. So, for example, perhaps we are changing the cluster 
config for min topic ISR, but the node config for the current controller node 
is unchanged. It should take priority, but it won't be in here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-29 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1583446833


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicMap.size() == minIsrRecords.size()) {
+// If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+// updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+new ArrayList<>(topicMap.keySet()),
+topicName -> topicMap.get(topicName))
+);
+return;
+}
+
+// Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+// the config updates to it. Use this config copy for the min ISR look 
up.
+Map> configDataCopy = 
new HashMap<>(configData);
+SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new 
LogContext("dummy-config-update"));
+for (ConfigRecord record : minIsrRecords) {
+replayInternal(record, configDataCopy, localSnapshotRegistry);
+}

Review Comment:
   This is the implementation challenge part of this PR. To find the effective 
min ISR value, it requires checking topic config -> dynamic broker config -> 
default broker config -> ... 
   Let's say the user updates the default broker config:
   1. All the topics could be affected.
   2. The effective min ISR values should be recalculated.
   3. We need to generate the partition change records along with the config 
change records, which means the ReplicationControlManager can't use the regular 
methods for the effective min ISR value. The value should be determined by the 
config records and the current configs.
   
   I found it easier to make a copy of the configs and apply the min ISR 
updates on the copy. Then let the ReplicationControlManager check all the 
partitions with the config copy.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-29 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1583424779


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});

Review Comment:
   If `min.insync.replicas` is not set on the topic config level, the effective 
`min.insync.replicas` of a topic will change if default broker config is 
updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-29 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1583423622


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource(
 if (error.isFailure()) {
 return error;
 }
+maybeTriggerPartitionUpdateOnMinIsrChange(newRecords);

Review Comment:
   `legacyAlterConfigResource` has supported. Adding UT to cover it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-29 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1583422586


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator 
partitionsWithBrokerInElr(int brokerId
 }
 return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
 }
+
+BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() {
+Map topicMap = new HashMap<>();
+for (Map map : elrMembers.values()) {
+if (map != null) {

Review Comment:
   Done, it can't be null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579894472


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -66,6 +69,7 @@ public class ConfigurationControlManager {
 private final TimelineHashMap> configData;
 private final Map staticConfig;
 private final ConfigResource currentController;
+private final MinIsrConfigUpdatePartitionHandler 
minIsrConfigUpdatePartitionHandler;

Review Comment:
   maybe more of a question for someone with more code ownership of the quorum 
controller code, but I wonder if it would be preferable to handle generating 
the replication control manager records in the 
`QuorumController.incrementalAlterConfigs`. That would also make it a bit 
easier to handle `validateOnly` which we are not currently handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579888146


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});
+
+if (minIsrRecords.isEmpty()) return;
+if (topicMap.size() == minIsrRecords.size()) {
+// If all the min isr config updates are on the topic level, we 
can trigger a simpler update just on the
+// updated topics.
+
records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(
+new ArrayList<>(topicMap.keySet()),
+topicName -> topicMap.get(topicName))
+);
+return;
+}
+
+// Because it may require multiple layer look up for the min ISR 
config value. Build a config data copy and apply
+// the config updates to it. Use this config copy for the min ISR look 
up.
+Map> configDataCopy = 
new HashMap<>(configData);
+SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new 
LogContext("dummy-config-update"));
+for (ConfigRecord record : minIsrRecords) {
+replayInternal(record, configDataCopy, localSnapshotRegistry);
+}

Review Comment:
   why are we calling replay here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579886194


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource 
configResource,
 return ApiError.NONE;
 }
 
+void maybeTriggerPartitionUpdateOnMinIsrChange(List 
records) {
+List minIsrRecords = new ArrayList<>();
+Map topicMap = new HashMap<>();
+Map configRemovedTopicMap = new HashMap<>();
+records.forEach(record -> {
+if (MetadataRecordType.fromId(record.message().apiKey()) == 
MetadataRecordType.CONFIG_RECORD) {
+ConfigRecord configRecord = (ConfigRecord) record.message();
+if 
(configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) {
+minIsrRecords.add(configRecord);
+if (Type.forId(configRecord.resourceType()) == Type.TOPIC) 
{
+if (configRecord.value() == null) 
topicMap.put(configRecord.resourceName(), configRecord.value());
+else 
configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value());
+}
+}
+}
+});

Review Comment:
   what is the behavior if the default broker config for `min.insync.replicas` 
is changed? 
   I am not actually sure how that impacts the `min.insync.replicas` for 
existing topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579880592


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource(
 if (error.isFailure()) {
 return error;
 }
+maybeTriggerPartitionUpdateOnMinIsrChange(newRecords);

Review Comment:
   we need to support `legacyAlterConfigResource` also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-25 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1579872828


##
metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java:
##
@@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator 
partitionsWithBrokerInElr(int brokerId
 }
 return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
 }
+
+BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() {
+Map topicMap = new HashMap<>();
+for (Map map : elrMembers.values()) {
+if (map != null) {

Review Comment:
   when would this be null? is there a particular reason we chose to use a null 
array instead of an empty array?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-17 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1569269269


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
getPartitionElrUpdatesForConfigChanges(Optional topicName) {
+if (!isElrEnabled()) return 
ControllerResult.of(Collections.emptyList(), null);
+
+List records = new ArrayList<>();
+if (topicName.isPresent()) {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+
brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(;
+} else {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+brokersToElrs.partitionsWithElr());
+}

Review Comment:
   @CalvinConfluent makes sense, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-17 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1569214320


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
getPartitionElrUpdatesForConfigChanges(Optional topicName) {
+if (!isElrEnabled()) return 
ControllerResult.of(Collections.emptyList(), null);
+
+List records = new ArrayList<>();
+if (topicName.isPresent()) {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+
brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(;
+} else {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+brokersToElrs.partitionsWithElr());
+}

Review Comment:
   Yes, we should only update the partition(remove the ELRs) when min isr 
decreases. Because if the min isr decreases, the partition can advance the HWM 
with fewer ISR members, this can invalidate the ELR as a potential leader.
   Sure, I can add them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-17 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1569210550


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   Make sense, I have some misunderstanding about the controller events. Will 
update. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-16 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1568044239


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   hmm, I think a better way to think about it is that we want to append the 
min ISR config update atomically with the partition change records. Appending 
the partition change records once the config change is replayed is difficult to 
reason about and possibly incorrect. Thinking a bit more about it, triggering a 
write event from the `replay()` for the config change record means that every 
time we reload the metadata log, we would replay the config change record and 
generate new partition change records.
   
   Perhaps one example to look at is 
`ReplicationControlManager.handleBrokerFenced`. When a broker is fenced, we 
generate a broker registration change record along with the leaderAndIsr 
partition change records. I assume we want to follow a similar model with the 
topic configuration change events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-16 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1568084297


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
 return response;
 }
 
+ControllerResult 
getPartitionElrUpdatesForConfigChanges(Optional topicName) {
+if (!isElrEnabled()) return 
ControllerResult.of(Collections.emptyList(), null);
+
+List records = new ArrayList<>();
+if (topicName.isPresent()) {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+
brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(;
+} else {
+generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, 
NO_LEADER, NO_LEADER, records,
+brokersToElrs.partitionsWithElr());
+}

Review Comment:
   I haven't been following along too closely. Is my understanding correct that 
we would only expect to generate partition change records that clear the ELR 
when the min ISR config decreases?
   
   When the configured topic ISR increases, it would not be safe to include 
more replicas in the ELR, since they may not have the HWM.
   
   If my understanding is correct, should we have tests to verify that:
   1. When the configured topic ISR decreases, we generate the expected 
partition change record events.
   2. When the configured topic ISR increases, we do not generate any partition 
change record events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-16 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1568044239


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   hmm, I think a better way to think about it is that we want to append the 
min ISR config update atomically with the partition change records. Only 
appending the partition change records once the config change is replayed is 
difficult to reason about and possibly incorrect.
   
   Perhaps one example to look at is 
`ReplicationControlManager.handleBrokerFenced`. When a broker is fenced, we 
generate a broker registration change record along with the leaderAndIsr 
partition change records. I assume we want to follow a similar model with the 
topic configuration change events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-15 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1566471100


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {
+try {
+if (type == Type.TOPIC) {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName()));
+} else {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty());
+}
+} catch (InterruptedException | ExecutionException e) {
+throw new Throwable("Fail to append partition updates for the 
min isr update: " + e.getMessage());
+}

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-15 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1566455070


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   Got it, we basically only need to call the appendWriteEvents and do not wait 
for the replay().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-15 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1566242343


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   Then we only need to call appendWriteEvent here? We don't have to wait for 
the replay().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-15 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1566242343


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   Then we only need to call appendWriteEvent here? We don't have to wait for 
the replay().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-13 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1564110479


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {

Review Comment:
   Sorry. This is bad, really bad :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-12 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1563199330


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   calling `.get()` on an appendWriteEvent doesn't look right to me. If I 
understand correctly, the `appendWriteEvents` are handled in the quorum 
controller event loop thread.
   
   We would expect `replay()` to also be called in the event loop thread. so if 
we trigger an `appendWriteEvent` and block waiting for the result, it would 
always time out, since we are blocking the processing thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-12 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1563195256


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {
+try {
+if (type == Type.TOPIC) {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName()));
+} else {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty());
+}
+} catch (InterruptedException | ExecutionException e) {
+throw new Throwable("Fail to append partition updates for the 
min isr update: " + e.getMessage());
+}

Review Comment:
   throwing as a throwable seems odd.
   



##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {
+try {
+if (type == Type.TOPIC) {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName()));
+} else {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty());
+}
+} catch (InterruptedException | ExecutionException e) {
+throw new Throwable("Fail to append partition updates for the 
min isr update: " + e.getMessage());
+}

Review Comment:
   throwing an exception as a throwable seems odd.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-12 Thread via GitHub


splett2 commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1563193794


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {

Review Comment:
   we shouldn't use `==` to compare strings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-12 Thread via GitHub


CalvinConfluent commented on PR #15702:
URL: https://github.com/apache/kafka/pull/15702#issuecomment-2052041676

   @mumrah Can you help take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org