kamalcph commented on code in PR #14176: URL: https://github.com/apache/kafka/pull/14176#discussion_r1289553687
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) { long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " - + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " - + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); + + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " + + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } + } - if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { - boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); - if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { - throw new ConfigException("Remote log storage is unsupported for the compacted topics"); - } + public static void validateValuesInBroker(Map<?, ?> props) { + validateValues(props); + Boolean isRemoteLogStorageSystemEnabled = + (Boolean) props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); + Boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); Review Comment: It takes the default value of `false`. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -459,49 +463,53 @@ public static void validateValues(Map<?, ?> props) { long maxCompactionLag = (Long) props.get(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG); if (minCompactionLag > maxCompactionLag) { throw new InvalidConfigurationException("conflict topic config setting " - + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " - + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); + + TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + " (" + minCompactionLag + ") > " + + TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG + " (" + maxCompactionLag + ")"); } + } - if (props.containsKey(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG)) { - boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); - String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); - if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { - throw new ConfigException("Remote log storage is unsupported for the compacted topics"); - } + public static void validateValuesInBroker(Map<?, ?> props) { + validateValues(props); + Boolean isRemoteLogStorageSystemEnabled = + (Boolean) props.get(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); + Boolean isRemoteStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + if (!isRemoteLogStorageSystemEnabled && isRemoteStorageEnabled) { + throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + + "Topic cannot be configured with remote log storage."); } - if (props.containsKey(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) { - Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); - Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); - if (retentionBytes > -1 && localLogRetentionBytes != -2) { - if (localLogRetentionBytes == -1) { - String message = String.format("Value must not be -1 as %s value is set as %d.", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); - } - if (localLogRetentionBytes > retentionBytes) { - String message = String.format("Value must not be more than %s property value: %d", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); - throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); - } + String cleanupPolicy = props.get(TopicConfig.CLEANUP_POLICY_CONFIG).toString().toLowerCase(Locale.getDefault()); + if (isRemoteStorageEnabled && cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_COMPACT)) { + throw new ConfigException("Remote log storage is unsupported for the compacted topics"); + } + + Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); + Long localLogRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); + if (isRemoteStorageEnabled && retentionBytes > -1 && localLogRetentionBytes != -2) { + if (localLogRetentionBytes == -1) { + String message = String.format("Value must not be -1 as %s value is set as %d.", + TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); + throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); + } + if (localLogRetentionBytes > retentionBytes) { + String message = String.format("Value must not be more than %s property value: %d", + TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); + throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localLogRetentionBytes, message); } Review Comment: Client don't know the value configured in the broker for `log.local.retention.ms`, `log.retention.ms`, `log.local.retention.bytes` and `log.retention.bytes`. It takes the default value from LogConfig when doing `CONFIG.parse(props)` so doing this validation on the client side might be incorrect. We can move the validation for `compact` topic to client-side but it will also by-pass when using incrementalAlterConfigs API so kept all the validations in the broker side. ########## server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java: ########## @@ -85,7 +85,8 @@ public final class ServerTopicConfigSynonyms { sameNameWithLogPrefix(TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG), sameNameWithLogPrefix(TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG), sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG), - sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG) + sameNameWithLogPrefix(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG), + sameName("remote.log.storage.system.enable") Review Comment: Using the constant from RemoteLogMangerConfig requires adding `storage` module as dependency for `server-common` module which brings circular dependency. Reusing the constant can be done in the next PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org