This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f9c37032ffc316de86fee3d8096a566663a56451 Author: Ken Huang <100591800+m1a...@users.noreply.github.com> AuthorDate: Mon Jun 3 12:04:58 2024 +0900 KAFKA-16859 Cleanup check if tiered storage is enabled (#16153) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/server/builders/KafkaApisBuilder.java | 2 +- .../server/builders/ReplicaManagerBuilder.java | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../src/main/scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/ConfigHandler.scala | 2 +- .../server/ControllerConfigurationValidator.scala | 3 ++- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +--- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 ++-- .../kafka/log/remote/RemoteLogManagerTest.java | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 26 +++++++++++----------- .../unit/kafka/server/ReplicaManagerTest.scala | 4 ++-- 12 files changed, 27 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 6ffd741f4fc..1d422461678 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -179,7 +179,7 @@ public class KafkaApisBuilder { if (metrics == null) throw new RuntimeException("You must set metrics"); if (quotas == null) throw new RuntimeException("You must set quotas"); if (fetchManager == null) throw new RuntimeException("You must set fetchManager"); - if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.isRemoteLogStorageSystemEnabled()); + if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager"); return new KafkaApis(requestChannel, diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 82aa75909ab..5e8cf2dcdc6 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -185,7 +185,7 @@ public class ReplicaManagerBuilder { if (metadataCache == null) throw new RuntimeException("You must set metadataCache"); if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel"); if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager"); - if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.isRemoteLogStorageSystemEnabled()); + if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); // Initialize metrics in the end just before passing it to ReplicaManager to ensure ReplicaManager closes the // metrics correctly. There might be a resource leak if it is initialized and an exception occurs between // its initialization and creation of ReplicaManager. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 8bc6db2dff6..45d6ab7908d 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1562,7 +1562,7 @@ object LogManager { keepPartitionMetadataFile: Boolean): LogManager = { val defaultProps = config.extractLogConfigMap - LogConfig.validateBrokerLogConfigValues(defaultProps, config.isRemoteLogStorageSystemEnabled) + LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.enableRemoteStorageSystem()) val defaultLogConfig = new LogConfig(defaultProps) val cleanerConfig = LogCleaner.cleanerConfig(config) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 94bcf321420..baafc3e658f 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -186,7 +186,7 @@ class BrokerServer( kafkaScheduler.startup() /* register broker metrics */ - brokerTopicStats = new BrokerTopicStats(config.isRemoteLogStorageSystemEnabled) + brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem()) quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-") diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 1d5702e76e4..ed9260b2194 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -70,7 +70,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) - logManager.updateTopicConfig(topic, props, kafkaConfig.isRemoteLogStorageSystemEnabled) + logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) } diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index 15eb1eff04a..f957b65ddd1 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -107,7 +107,8 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu throw new InvalidConfigurationException("Null value not supported for topic configs: " + nullTopicConfigs.mkString(",")) } - LogConfig.validate(properties, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + LogConfig.validate(properties, kafkaConfig.extractLogConfigMap, + kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b812f299ecb..3635d5a3edd 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -17,7 +17,7 @@ package kafka.server -import java.{lang, util} +import java.util import java.util.concurrent.TimeUnit import java.util.{Collections, Properties} import kafka.cluster.EndPoint @@ -1207,8 +1207,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def usesTopicId: Boolean = usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported - - val isRemoteLogStorageSystemEnabled: lang.Boolean = getBoolean(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP) def logLocalRetentionBytes: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP) def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b913b72fc9c..9c807c79da5 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -276,7 +276,7 @@ class KafkaServer( createCurrentControllerIdMetric() /* register broker metrics */ - _brokerTopicStats = new BrokerTopicStats(config.isRemoteLogStorageSystemEnabled) + _brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem()) quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++ metrics.reporters.asScala) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index efecfe854bb..604e03c7ed4 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -163,7 +163,7 @@ class AdminZkClient(zkClient: KafkaZkClient, LogConfig.validate(config, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), - kafkaConfig.exists(_.isRemoteLogStorageSystemEnabled)) + kafkaConfig.exists(_.remoteLogManagerConfig.enableRemoteStorageSystem())) } private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment], @@ -481,7 +481,7 @@ class AdminZkClient(zkClient: KafkaZkClient, // remove the topic overrides LogConfig.validate(configs, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), - kafkaConfig.exists(_.isRemoteLogStorageSystemEnabled)) + kafkaConfig.exists(_.remoteLogManagerConfig.enableRemoteStorageSystem())) } /** diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 0ba5d63a8da..50b581fdf4e 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -222,7 +222,7 @@ public class RemoteLogManagerTest { props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); remoteLogManagerConfig = createRLMConfig(props); - brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).isRemoteLogStorageSystemEnabled()); + brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().enableRemoteStorageSystem()); remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 7be9bcd0267..2bf147b528b 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -298,7 +298,7 @@ class LogConfigTest { props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString) assertThrows(classOf[ConfigException], - () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) } @Test @@ -310,17 +310,17 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact") assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) } @ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}") @@ -333,10 +333,10 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") if (sysRemoteStorageEnabled) { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) } else { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) } } @@ -356,10 +356,10 @@ class LogConfigTest { logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500") if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) } else { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) } } @@ -378,10 +378,10 @@ class LogConfigTest { logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128") if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) } else { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) } } @@ -396,10 +396,10 @@ class LogConfigTest { if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled)) + () => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) } else { - LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.isRemoteLogStorageSystemEnabled) + LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9f820155c8e..65cbcc7bd70 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4095,7 +4095,7 @@ class ReplicaManagerTest { val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) val remoteLogManagerConfig = new RemoteLogManagerConfig(config) val mockLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).isRemoteLogStorageSystemEnabled) + val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem()) val remoteLogManager = new RemoteLogManager( remoteLogManagerConfig, 0, @@ -4195,7 +4195,7 @@ class ReplicaManagerTest { val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) val remoteLogManagerConfig = new RemoteLogManagerConfig(config) val dummyLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).isRemoteLogStorageSystemEnabled) + val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem()) val remoteLogManager = new RemoteLogManager( remoteLogManagerConfig, 0,