Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
mumrah commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1672784230 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2225,9 +2250,14 @@ private void listReassigningTopic(ListPartitionReassignmentsResponseData respons } // Visible to test. -int getTopicEffectiveMinIsr(String topicName) { +int getTopicEffectiveMinIsr(String topicName, Function getTopicMinIsrConfig) { int currentMinIsr = defaultMinIsr; -String minIsrConfig = configurationControl.getTopicConfig(topicName, MIN_IN_SYNC_REPLICAS_CONFIG); +String minIsrConfig; +if (getTopicMinIsrConfig == null) { Review Comment: Can we split the method into two signatures instead of having the null? E.g., `int getTopicEffectiveMinIsr(String topicName)` and `int getTopicEffectiveMinIsr(String topicName, Function getTopicMinIsrConfig)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
mumrah commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1672730022 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -500,9 +631,26 @@ boolean uncleanLeaderElectionEnabledForTopic(String name) { return false; // TODO: support configuring unclean leader election. } +Map computeEffectiveTopicConfigsWithPendingChange( +Map> pendingConfigData +) { +Map pendingClusterConfig = +pendingConfigData.containsKey(DEFAULT_NODE) ? pendingConfigData.get(DEFAULT_NODE) : Collections.emptyMap(); +Map pendingControllerConfig = +pendingConfigData.containsKey(currentController) ? pendingConfigData.get(currentController) : Collections.emptyMap(); +return configSchema.resolveEffectiveTopicConfigs( +new OrderedConfigResolver(staticConfig), Review Comment: Hm.. what happens if the controller has different static configs from the broker? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
mumrah commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1671034456 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicToMinIsrValueMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() != null) topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicToMinIsrValueMap.size() == minIsrRecords.size()) { Review Comment: The size comparison here and below are a little non-obvious (to me at least). Maybe we can set a boolean as we're looping through the records to determine if we hit this branch. Alternative question, is this optimization helping with performance? We still need the code for the case of overlaying configs from different levels, so having this separate code path just increases complexity. ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { Review Comment: The complexity here is a tad bit high. Can we extract a method for getting the minIsrRecords ConfigRecord from `List`? ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -426,6 +497,35 @@ public void replay(ConfigRecord record) { } } +/** + * Apply a configuration record to the given config data. Note that, it will store null for the config to be removed. + * + * @param record The ConfigRecord. + * @param localConfigDataThe config data is going to be updated. + */ +public void replayForPendingConfig( Review Comment: Can we reuse ConfigurationsImage here instead of adding another place where we are applying records? I think it should be reasonably straightforward to construct a ConfigurationsImage with the in-memory state (`localConfigData`) and then replay records to get a ConfigurationsDelta. ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -309,6 +331,54 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicToMinIsrValueMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() != null) topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); Review Comment: style nit: Can you reformat these to not be inline? ## metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java: ## @@ -244,4 +245,34 @@ private ConfigEntry toConfigEntry(ConfigDef.ConfigKey configKey, translateConfigType(configKey.type()), configKey.documentation); } + +/** + * OrderedConfigResolver helps to find the configs in the order of the list config maps. + * One thing to notice that, when calling containsKey, if a config contains a null value entry, + * it will return false as null value means the config value should be ignored. + **/ +public
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1616598075 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -443,9 +525,33 @@ Map getConfigs(ConfigResource configResource) { * @param configKeyThe key for the config. */ String getTopicConfig(String topicName, String configKey) throws NoSuchElementException { -Map map = configData.get(new ConfigResource(Type.TOPIC, topicName)); +return getTopicConfigWithPendingChange(topicName, configKey, configData); +} + +/** + * Get the config value for the give topic and give config key. Also, it will search the configs in the pending + * config data first. + * If the config value is not found, return null. + * + * @param topicNameThe topic name for the config. + * @param configKeyThe key for the config. + * @param pendingConfigDataThe configs which is going to be applied. It should have the higher priority than + * the current configs. + */ +String getTopicConfigWithPendingChange( +String topicName, +String configKey, +Map> pendingConfigData Review Comment: I used a `OrderedConfigResolver` later to solve the problem that we have to look up both pending config data and current config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1614174475 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicMap.size() == minIsrRecords.size()) { +// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the +// updated topics. + records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate( +new ArrayList<>(topicMap.keySet()), +topicName -> topicMap.get(topicName)) +); +return; +} + +// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply +// the config updates to it. Use this config copy for the min ISR look up. +Map> pendingConfigData = new HashMap<>(); +SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); Review Comment: However, you just remind me that this does not handle config removal correctly. Addressing the removal will make the change more complicated. Maybe we should consider the previous approach which makes a copy of the current configs(min ISR part) and apply the updates to the copy. @cmccabe any comments here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1614174475 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicMap.size() == minIsrRecords.size()) { +// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the +// updated topics. + records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate( +new ArrayList<>(topicMap.keySet()), +topicName -> topicMap.get(topicName)) +); +return; +} + +// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply +// the config updates to it. Use this config copy for the min ISR look up. +Map> pendingConfigData = new HashMap<>(); +SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); Review Comment: However, you just remind me that this does not handle config removal correctly. Addressing the removal will make the change more complicated. Maybe we should consider the previous approach which makes a copy of the current configs(min ISR part) and apply the updates to the copy. @cmccabe any comments here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1614169536 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicMap.size() == minIsrRecords.size()) { +// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the +// updated topics. + records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate( +new ArrayList<>(topicMap.keySet()), +topicName -> topicMap.get(topicName)) +); +return; +} + +// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply +// the config updates to it. Use this config copy for the min ISR look up. +Map> pendingConfigData = new HashMap<>(); +SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); Review Comment: Yeah, it is not a straightforward change. So the pending config data populated here will be checked together with the existing configs. See how `OrderedConfigResolver` is used below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
cmccabe commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1614159525 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); Review Comment: Yeah, unfortunately we have a 5 level system: 1. topic configs (highest priority) 2. node configs (aka "broker" configs, but they also apply to controllers) 3. cluster configs (aka default configs for the broker resource) 4. static configuration 5. static default The first three levels can change at runtime ядо -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
cmccabe commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1614158684 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { Review Comment: I guess eventually we will probably want some more general system for responding to configuration changes, and not special-case min.isr. However, we don't have to do that in this PR. I also understand why you had this function receive a list of records here rather than something fancier (easier to integrate into the existing code, and into the two configuration change paths.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
cmccabe commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1614157582 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -443,9 +525,33 @@ Map getConfigs(ConfigResource configResource) { * @param configKeyThe key for the config. */ String getTopicConfig(String topicName, String configKey) throws NoSuchElementException { -Map map = configData.get(new ConfigResource(Type.TOPIC, topicName)); +return getTopicConfigWithPendingChange(topicName, configKey, configData); +} + +/** + * Get the config value for the give topic and give config key. Also, it will search the configs in the pending + * config data first. + * If the config value is not found, return null. + * + * @param topicNameThe topic name for the config. + * @param configKeyThe key for the config. + * @param pendingConfigDataThe configs which is going to be applied. It should have the higher priority than + * the current configs. + */ +String getTopicConfigWithPendingChange( +String topicName, +String configKey, +Map> pendingConfigData Review Comment: It seems like there could be some value in turning the pending config change into a class of its own, if we're going to be querying it like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
cmccabe commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1614156696 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +330,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicMap.size() == minIsrRecords.size()) { +// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the +// updated topics. + records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate( +new ArrayList<>(topicMap.keySet()), +topicName -> topicMap.get(topicName)) +); +return; +} + +// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply +// the config updates to it. Use this config copy for the min ISR look up. +Map> pendingConfigData = new HashMap<>(); +SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); Review Comment: Maybe I'm missing something, but I don't think this works... `pendingConfigData` will have some of the new changes you made, but not all of the existing changes. So, for example, perhaps we are changing the cluster config for min topic ISR, but the node config for the current controller node is unchanged. It should take priority, but it won't be in here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1583446833 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicMap.size() == minIsrRecords.size()) { +// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the +// updated topics. + records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate( +new ArrayList<>(topicMap.keySet()), +topicName -> topicMap.get(topicName)) +); +return; +} + +// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply +// the config updates to it. Use this config copy for the min ISR look up. +Map> configDataCopy = new HashMap<>(configData); +SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); +for (ConfigRecord record : minIsrRecords) { +replayInternal(record, configDataCopy, localSnapshotRegistry); +} Review Comment: This is the implementation challenge part of this PR. To find the effective min ISR value, it requires checking topic config -> dynamic broker config -> default broker config -> ... Let's say the user updates the default broker config: 1. All the topics could be affected. 2. The effective min ISR values should be recalculated. 3. We need to generate the partition change records along with the config change records, which means the ReplicationControlManager can't use the regular methods for the effective min ISR value. The value should be determined by the config records and the current configs. I found it easier to make a copy of the configs and apply the min ISR updates on the copy. Then let the ReplicationControlManager check all the partitions with the config copy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1583424779 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); Review Comment: If `min.insync.replicas` is not set on the topic config level, the effective `min.insync.replicas` of a topic will change if default broker config is updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1583423622 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource( if (error.isFailure()) { return error; } +maybeTriggerPartitionUpdateOnMinIsrChange(newRecords); Review Comment: `legacyAlterConfigResource` has supported. Adding UT to cover it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1583422586 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int brokerId } return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false); } + +BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() { +Map topicMap = new HashMap<>(); +for (Map map : elrMembers.values()) { +if (map != null) { Review Comment: Done, it can't be null -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579894472 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -66,6 +69,7 @@ public class ConfigurationControlManager { private final TimelineHashMap> configData; private final Map staticConfig; private final ConfigResource currentController; +private final MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler; Review Comment: maybe more of a question for someone with more code ownership of the quorum controller code, but I wonder if it would be preferable to handle generating the replication control manager records in the `QuorumController.incrementalAlterConfigs`. That would also make it a bit easier to handle `validateOnly` which we are not currently handling. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579888146 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); + +if (minIsrRecords.isEmpty()) return; +if (topicMap.size() == minIsrRecords.size()) { +// If all the min isr config updates are on the topic level, we can trigger a simpler update just on the +// updated topics. + records.addAll(minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate( +new ArrayList<>(topicMap.keySet()), +topicName -> topicMap.get(topicName)) +); +return; +} + +// Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply +// the config updates to it. Use this config copy for the min ISR look up. +Map> configDataCopy = new HashMap<>(configData); +SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); +for (ConfigRecord record : minIsrRecords) { +replayInternal(record, configDataCopy, localSnapshotRegistry); +} Review Comment: why are we calling replay here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579886194 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -308,6 +328,48 @@ private ApiError validateAlterConfig(ConfigResource configResource, return ApiError.NONE; } +void maybeTriggerPartitionUpdateOnMinIsrChange(List records) { +List minIsrRecords = new ArrayList<>(); +Map topicMap = new HashMap<>(); +Map configRemovedTopicMap = new HashMap<>(); +records.forEach(record -> { +if (MetadataRecordType.fromId(record.message().apiKey()) == MetadataRecordType.CONFIG_RECORD) { +ConfigRecord configRecord = (ConfigRecord) record.message(); +if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { +minIsrRecords.add(configRecord); +if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { +if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); +else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); +} +} +} +}); Review Comment: what is the behavior if the default broker config for `min.insync.replicas` is changed? I am not actually sure how that impacts the `min.insync.replicas` for existing topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579880592 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -260,6 +279,7 @@ private ApiError incrementalAlterConfigResource( if (error.isFailure()) { return error; } +maybeTriggerPartitionUpdateOnMinIsrChange(newRecords); Review Comment: we need to support `legacyAlterConfigResource` also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1579872828 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -159,4 +160,14 @@ BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int brokerId } return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false); } + +BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() { +Map topicMap = new HashMap<>(); +for (Map map : elrMembers.values()) { +if (map != null) { Review Comment: when would this be null? is there a particular reason we chose to use a null array instead of an empty array? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1569269269 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult getPartitionElrUpdatesForConfigChanges(Optional topicName) { +if (!isElrEnabled()) return ControllerResult.of(Collections.emptyList(), null); + +List records = new ArrayList<>(); +if (topicName.isPresent()) { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, + brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(; +} else { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, +brokersToElrs.partitionsWithElr()); +} Review Comment: @CalvinConfluent makes sense, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1569214320 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult getPartitionElrUpdatesForConfigChanges(Optional topicName) { +if (!isElrEnabled()) return ControllerResult.of(Collections.emptyList(), null); + +List records = new ArrayList<>(); +if (topicName.isPresent()) { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, + brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(; +} else { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, +brokersToElrs.partitionsWithElr()); +} Review Comment: Yes, we should only update the partition(remove the ELRs) when min isr decreases. Because if the min isr decreases, the partition can advance the HWM with fewer ISR members, this can invalidate the ELR as a potential leader. Sure, I can add them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1569210550 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: Make sense, I have some misunderstanding about the controller events. Will update. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1568044239 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: hmm, I think a better way to think about it is that we want to append the min ISR config update atomically with the partition change records. Appending the partition change records once the config change is replayed is difficult to reason about and possibly incorrect. Thinking a bit more about it, triggering a write event from the `replay()` for the config change record means that every time we reload the metadata log, we would replay the config change record and generate new partition change records. Perhaps one example to look at is `ReplicationControlManager.handleBrokerFenced`. When a broker is fenced, we generate a broker registration change record along with the leaderAndIsr partition change records. I assume we want to follow a similar model with the topic configuration change events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1568084297 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult getPartitionElrUpdatesForConfigChanges(Optional topicName) { +if (!isElrEnabled()) return ControllerResult.of(Collections.emptyList(), null); + +List records = new ArrayList<>(); +if (topicName.isPresent()) { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, + brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(; +} else { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, +brokersToElrs.partitionsWithElr()); +} Review Comment: I haven't been following along too closely. Is my understanding correct that we would only expect to generate partition change records that clear the ELR when the min ISR config decreases? When the configured topic ISR increases, it would not be safe to include more replicas in the ELR, since they may not have the HWM. If my understanding is correct, should we have tests to verify that: 1. When the configured topic ISR decreases, we generate the expected partition change record events. 2. When the configured topic ISR increases, we do not generate any partition change record events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1568044239 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: hmm, I think a better way to think about it is that we want to append the min ISR config update atomically with the partition change records. Only appending the partition change records once the config change is replayed is difficult to reason about and possibly incorrect. Perhaps one example to look at is `ReplicationControlManager.handleBrokerFenced`. When a broker is fenced, we generate a broker registration change record along with the leaderAndIsr partition change records. I assume we want to follow a similar model with the topic configuration change events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1566471100 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { +try { +if (type == Type.TOPIC) { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName())); +} else { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty()); +} +} catch (InterruptedException | ExecutionException e) { +throw new Throwable("Fail to append partition updates for the min isr update: " + e.getMessage()); +} Review Comment: Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1566455070 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: Got it, we basically only need to call the appendWriteEvents and do not wait for the replay(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1566242343 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: Then we only need to call appendWriteEvent here? We don't have to wait for the replay(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1566242343 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: Then we only need to call appendWriteEvent here? We don't have to wait for the replay(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1564110479 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { Review Comment: Sorry. This is bad, really bad :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1563199330 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: calling `.get()` on an appendWriteEvent doesn't look right to me. If I understand correctly, the `appendWriteEvents` are handled in the quorum controller event loop thread. We would expect `replay()` to also be called in the event loop thread. so if we trigger an `appendWriteEvent` and block waiting for the result, it would always time out, since we are blocking the processing thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1563195256 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { +try { +if (type == Type.TOPIC) { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName())); +} else { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty()); +} +} catch (InterruptedException | ExecutionException e) { +throw new Throwable("Fail to append partition updates for the min isr update: " + e.getMessage()); +} Review Comment: throwing as a throwable seems odd. ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { +try { +if (type == Type.TOPIC) { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName())); +} else { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty()); +} +} catch (InterruptedException | ExecutionException e) { +throw new Throwable("Fail to append partition updates for the min isr update: " + e.getMessage()); +} Review Comment: throwing an exception as a throwable seems odd. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1563193794 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { Review Comment: we shouldn't use `==` to compare strings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on PR #15702: URL: https://github.com/apache/kafka/pull/15702#issuecomment-2052041676 @mumrah Can you help take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org