showuon commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1294139696
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -454,54 +460,101 @@ public static void validateNames(Properties props) { throw new InvalidConfigurationException("Unknown topic config name: " + name); } + /** + * Validates the values of the given properties. Can be called by both client and server. + * The `props` supplied should contain all the LogConfig properties and the default values are extracted from the + * LogConfig class. + * @param props The properties to be validated + */ public static void validateValues(Map<?, ?> props) { long minCompactionLag = (Long) props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG); long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " - + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " - + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); + + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " + + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } + } - if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { - boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); - if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { - throw new ConfigException("Remote log storage is unsupported for the compacted topics"); - } + /** + * Validates the configured values of the LogConfig. Should be called only by the broker. + * The `props` supplied doesn't contain any topic-level configs, only broker-level configs. + * The default values should be extracted from the KafkaConfig. + * @param props The properties to be validated + */ + public static void validateConfiguredValuesInBroker(Map<?, ?> props) { Review Comment: Could you explain more about what's the difference between this method and `validateValuesInBroker` method? I think this method is only invoked when initializing `LogManager` without topic related configs, is that right? From the method name, it's really difficult to identify what they are doing. Thanks. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -265,7 +266,12 @@ public Optional<String> serverConfigName(String configName) { .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM, TopicConfig.LOCAL_LOG_RETENTION_MS_DOC) .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, - TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC); + TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) + // RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP is defined here to ensure that when system + // level remote storage functionality is disabled, topics cannot be configured to use remote storage. + .defineInternal(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN, + RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM, + RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC); Review Comment: This still makes me feel it's not the correct place to put, while other configs are topic related configs. Could we pass in the `KafkaConfig` object into `LogConfig.validate`, i.e. ``` # currently LogConfig.validate(config, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap())) # my suggestion LogConfig.validate(config, kafkaConfig) ``` so that we don't need to make this `LogConfig` complicated? Thoughts @kamalcph @divijvaidya ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org