showuon commented on code in PR #14176:
URL: https://github.com/apache/kafka/pull/14176#discussion_r1293269490


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -265,7 +266,10 @@ public Optional<String> serverConfigName(String 
configName) {
             .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
                 TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
             .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
-                TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+                TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+            
.define(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN,
+                    
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM,
+                    
RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC);

Review Comment:
   Will this appear in the official doc under `Topic Config` section? If so, 
maybe `defineInternal`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -454,54 +458,101 @@ public static void validateNames(Properties props) {
                 throw new InvalidConfigurationException("Unknown topic config 
name: " + name);
     }
 
+    /**
+     * Validates the values of the given properties. Can be called by both 
client and server.
+     * The `props` supplied should contain all the LogConfig properties and 
the default values are extracted from the
+     * LogConfig class.
+     * @param props The properties to be validated
+     */
     public static void validateValues(Map<?, ?> props) {
         long minCompactionLag = (Long) 
props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
         long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
         if (minCompactionLag > maxCompactionLag) {
             throw new InvalidConfigurationException("conflict topic config 
setting "
-                + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-                + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
+                    + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
+                    + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
         }
+    }
 
-        if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-            boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-            String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-            if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-                throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-            }
+    /**
+     * Validates the default values of the LogConfig. Should be called only by 
the broker.
+     * The `props` supplied should contain all the LogConfig properties except
+     * TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG and the default values 
should be extracted from the KafkaConfig.
+     * @param props The properties to be validated
+     */
+    public static void validateDefaultValuesInBroker(Map<?, ?> props) {

Review Comment:
   I don't think this is to validate `default` value in broker, it should be 
validating broker configs, including user overriding configs. Is that right? 
Maybe `validateConfiguredValuesInBroker`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -265,7 +266,10 @@ public Optional<String> serverConfigName(String 
configName) {
             .define(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_MS, atLeast(-2), MEDIUM,
                 TopicConfig.LOCAL_LOG_RETENTION_MS_DOC)
             .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, 
DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM,
-                TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC);
+                TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
+            
.define(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, BOOLEAN,
+                    
RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE, null, MEDIUM,
+                    
RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC);

Review Comment:
   Also, we should add a comment to say why we add this `broker level` config 
in log Config. 



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -454,54 +458,101 @@ public static void validateNames(Properties props) {
                 throw new InvalidConfigurationException("Unknown topic config 
name: " + name);
     }
 
+    /**
+     * Validates the values of the given properties. Can be called by both 
client and server.
+     * The `props` supplied should contain all the LogConfig properties and 
the default values are extracted from the
+     * LogConfig class.
+     * @param props The properties to be validated
+     */
     public static void validateValues(Map<?, ?> props) {
         long minCompactionLag = (Long) 
props.get(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG);
         long maxCompactionLag = (Long) 
props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG);
         if (minCompactionLag > maxCompactionLag) {
             throw new InvalidConfigurationException("conflict topic config 
setting "
-                + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
-                + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
+                    + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + 
minCompactionLag + ") > "
+                    + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + 
maxCompactionLag + ")");
         }
+    }
 
-        if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) {
-            boolean isRemoteStorageEnabled = (Boolean) 
props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
-            String cleanupPolicy = 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault());
-            if (isRemoteStorageEnabled && 
cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) {
-                throw new ConfigException("Remote log storage is unsupported 
for the compacted topics");
-            }
+    /**
+     * Validates the default values of the LogConfig. Should be called only by 
the broker.
+     * The `props` supplied should contain all the LogConfig properties except
+     * TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG and the default values 
should be extracted from the KafkaConfig.

Review Comment:
   From what I saw, we will include 
`TopicConfig#REMOTE_LOG_STORAGE_ENABLE_CONFIG` in the `props` parameter. What 
does this comment mean?
   
   Maybe what you want to say is in the `validateDefaultValuesInBroker` method, 
the `props` doesn't contain any `topic-level` configs, only broker-level 
configs, is that right?



##########
core/src/main/scala/kafka/server/ControllerServer.scala:
##########
@@ -231,7 +231,7 @@ class ControllerServer(
           setMetrics(quorumControllerMetrics).
           setCreateTopicPolicy(createTopicPolicy.asJava).
           setAlterConfigPolicy(alterConfigPolicy.asJava).
-          setConfigurationValidator(new ControllerConfigurationValidator()).
+          setConfigurationValidator(new 
ControllerConfigurationValidator(sharedServer.brokerConfig)).

Review Comment:
   I think the scenario from @divijvaidya is too complicated. I don't think we 
have any other similar config validations like this (from broker 1 has 
different config with broker 2). IMO, this PR already adds validation for it, 
and for the edge case, we can still fail the request with clear logs, it should 
be good enough. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to