cmccabe commented on code in PR #17952:
URL: https://github.com/apache/kafka/pull/17952#discussion_r1887272625
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -497,10 +541,63 @@ public Map<ConfigResource, ResultOrError<Map<String,
String>>> describeConfigs(
return results;
}
+ // The function is used when enabling ELR. It will create a new cluster
level min ISR config if there is not any.
+ // Also, it will remove all the broker level min ISR config records.
+ void maybeResetMinIsrConfig(List<ApiMessageAndVersion> outputRecords) {
+ if (!clusterConfig().containsKey(MIN_IN_SYNC_REPLICAS_CONFIG)) {
+ String minIsrDefaultConfigValue =
configSchema.getStaticOrDefaultConfig(
+ MIN_IN_SYNC_REPLICAS_CONFIG,
+ staticConfig
+ );
+ ApiError error = incrementalAlterConfigResource(
+ DEFAULT_NODE,
+ Map.of(MIN_IN_SYNC_REPLICAS_CONFIG, new
AbstractMap.SimpleEntry<>(SET, minIsrDefaultConfigValue)),
+ true,
+ outputRecords
+ );
+ if (error.isFailure()) {
+ log.error("Fail to update cluster level min ISR config during
the min ISR config reset: " + error);
+ }
+ }
+
+ Set<Integer> nodes = clusterControl.brokerRegistrations().keySet();
Review Comment:
There can be configurations for brokers that aren't currently registered.
Rather than doing this, which won't work, and introduces a dependency between
ConfigurationControlManager and ClustrerControlManager that doesn't need to be
there, we should just store the broker-level versions of this config in a map
or something so that we can easily iterate over it when eneded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]