showuon commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1293269490
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -265,7 +266,10 @@ public Optional<String> serverConfigName(String configName) { .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM, TopicConfig.LOCAL_LOG_RETENTION_MS_DOC) .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, - TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC); + TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) + .define(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN, + RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM, + RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC); Review Comment: Will this appear in the official doc under `Topic Config` section? If so, maybe `defineInternal`? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -454,54 +458,101 @@ public static void validateNames(Properties props) { throw new InvalidConfigurationException("Unknown topic config name: " + name); } + /** + * Validates the values of the given properties. Can be called by both client and server. + * The `props` supplied should contain all the LogConfig properties and the default values are extracted from the + * LogConfig class. + * @param props The properties to be validated + */ public static void validateValues(Map<?, ?> props) { long minCompactionLag = (Long) props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG); long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " - + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " - + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); + + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " + + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } + } - if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { - boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); - if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { - throw new ConfigException("Remote log storage is unsupported for the compacted topics"); - } + /** + * Validates the default values of the LogConfig. Should be called only by the broker. + * The `props` supplied should contain all the LogConfig properties except + * TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG and the default values should be extracted from the KafkaConfig. + * @param props The properties to be validated + */ + public static void validateDefaultValuesInBroker(Map<?, ?> props) { Review Comment: I don't think this is to validate `default` value in broker, it should be validating broker configs, including user overriding configs. Is that right? Maybe `validateConfiguredValuesInBroker`? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -265,7 +266,10 @@ public Optional<String> serverConfigName(String configName) { .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM, TopicConfig.LOCAL_LOG_RETENTION_MS_DOC) .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, - TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC); + TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) + .define(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN, + RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM, + RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC); Review Comment: Also, we should add a comment to say why we add this `broker level` config in log Config. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -454,54 +458,101 @@ public static void validateNames(Properties props) { throw new InvalidConfigurationException("Unknown topic config name: " + name); } + /** + * Validates the values of the given properties. Can be called by both client and server. + * The `props` supplied should contain all the LogConfig properties and the default values are extracted from the + * LogConfig class. + * @param props The properties to be validated + */ public static void validateValues(Map<?, ?> props) { long minCompactionLag = (Long) props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG); long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " - + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " - + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); + + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " + + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } + } - if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { - boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); - if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { - throw new ConfigException("Remote log storage is unsupported for the compacted topics"); - } + /** + * Validates the default values of the LogConfig. Should be called only by the broker. + * The `props` supplied should contain all the LogConfig properties except + * TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG and the default values should be extracted from the KafkaConfig. Review Comment: From what I saw, we will include `TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG` in the `props` parameter. What does this comment mean? Maybe what you want to say is in the `validateDefaultValuesInBroker` method, the `props` doesn't contain any `topic-level` configs, only broker-level configs, is that right? ########## core/src/main/scala/kafka/server/ControllerServer.scala: ########## @@ -231,7 +231,7 @@ class ControllerServer( setMetrics(quorumControllerMetrics). setCreateTopicPolicy(createTopicPolicy.asJava). setAlterConfigPolicy(alterConfigPolicy.asJava). - setConfigurationValidator(new ControllerConfigurationValidator()). + setConfigurationValidator(new ControllerConfigurationValidator(sharedServer.brokerConfig)). Review Comment: I think the scenario from @divijvaidya is too complicated. I don't think we have any other similar config validations like this (from broker 1 has different config with broker 2). IMO, this PR already adds validation for it, and for the edge case, we can still fail the request with clear logs, it should be good enough. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org