showuon commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1294139696


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -454,54 +460,101 @@ public static void validateNames(Properties props) {
                 throw new InvalidConfigurationException("Unknown topic config 
name: " + name);
     }
 
+    /**
+     * Validates the values of the given properties. Can be called by both 
client and server.
+     * The `props` supplied should contain all the LogConfig properties and 
the default values are extracted from the
+     * LogConfig class.
+     * @param props The properties to be validated
+     */
     public static void validateValues(Map<?, ?> props) {
         long minCompactionLag = (Long) 
props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
         long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
         if (minCompactionLag > maxCompactionLag) {
             throw new InvalidConfigurationException("conflict topic config 
setting "
-                + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-                + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
+                    + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
+                    + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
         }
+    }
 
-        if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-            boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-            String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-            if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-                throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-            }
+    /**
+     * Validates the configured values of the LogConfig. Should be called only 
by the broker.
+     * The `props` supplied doesn't contain any topic-level configs, only 
broker-level configs.
+     * The default values should be extracted from the KafkaConfig.
+     * @param props The properties to be validated
+     */
+    public static void validateConfiguredValuesInBroker(Map<?, ?> props) {

Review Comment:
   Could you explain more about what's the difference between this method and 
`validateValuesInBroker` method? I think this method is only invoked when 
initializing `LogManager` without topic related configs, is that right? From 
the method name, it's really difficult to identify what they are doing. Thanks.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -265,7 +266,12 @@ public Optional<String> serverConfigName(String 
configName) {
             .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
                 TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
             .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
-                TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+                TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+            // RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP is 
defined here to ensure that when system
+            // level remote storage functionality is disabled, topics cannot 
be configured to use remote storage.
+            
.defineInternal(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
BOOLEAN,
+                    
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM,
+                    
RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC);

Review Comment:
   This still makes me feel it's not the correct place to put, while other 
configs are topic related configs. Could we pass in the `KafkaConfig` object 
into `LogConfig.validate`, i.e.
   ```
   # currently
   LogConfig.validate(config, 
kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()))
   # my suggestion
   LogConfig.validate(config, kafkaConfig)
   ```
   so that we don't need to make this `LogConfig` complicated? Thoughts 
@kamalcph @divijvaidya ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to