This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f9c37032ffc316de86fee3d8096a566663a56451
Author: Ken Huang <100591800+m1a...@users.noreply.github.com>
AuthorDate: Mon Jun 3 12:04:58 2024 +0900

    KAFKA-16859 Cleanup check if tiered storage is enabled (#16153)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/server/builders/KafkaApisBuilder.java    |  2 +-
 .../server/builders/ReplicaManagerBuilder.java     |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 .../main/scala/kafka/server/ConfigHandler.scala    |  2 +-
 .../server/ControllerConfigurationValidator.scala  |  3 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +---
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  4 ++--
 .../kafka/log/remote/RemoteLogManagerTest.java     |  2 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 26 +++++++++++-----------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  4 ++--
 12 files changed, 27 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 6ffd741f4fc..1d422461678 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -179,7 +179,7 @@ public class KafkaApisBuilder {
         if (metrics == null) throw new RuntimeException("You must set 
metrics");
         if (quotas == null) throw new RuntimeException("You must set quotas");
         if (fetchManager == null) throw new RuntimeException("You must set 
fetchManager");
-        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.isRemoteLogStorageSystemEnabled());
+        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());
         if (apiVersionManager == null) throw new RuntimeException("You must 
set apiVersionManager");
 
         return new KafkaApis(requestChannel,
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 82aa75909ab..5e8cf2dcdc6 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -185,7 +185,7 @@ public class ReplicaManagerBuilder {
         if (metadataCache == null) throw new RuntimeException("You must set 
metadataCache");
         if (logDirFailureChannel == null) throw new RuntimeException("You must 
set logDirFailureChannel");
         if (alterPartitionManager == null) throw new RuntimeException("You 
must set alterIsrManager");
-        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.isRemoteLogStorageSystemEnabled());
+        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());
         // Initialize metrics in the end just before passing it to 
ReplicaManager to ensure ReplicaManager closes the
         // metrics correctly. There might be a resource leak if it is 
initialized and an exception occurs between
         // its initialization and creation of ReplicaManager.
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 8bc6db2dff6..45d6ab7908d 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1562,7 +1562,7 @@ object LogManager {
             keepPartitionMetadataFile: Boolean): LogManager = {
     val defaultProps = config.extractLogConfigMap
 
-    LogConfig.validateBrokerLogConfigValues(defaultProps, 
config.isRemoteLogStorageSystemEnabled)
+    LogConfig.validateBrokerLogConfigValues(defaultProps, 
config.remoteLogManagerConfig.enableRemoteStorageSystem())
     val defaultLogConfig = new LogConfig(defaultProps)
 
     val cleanerConfig = LogCleaner.cleanerConfig(config)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 94bcf321420..baafc3e658f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -186,7 +186,7 @@ class BrokerServer(
       kafkaScheduler.startup()
 
       /* register broker metrics */
-      brokerTopicStats = new 
BrokerTopicStats(config.isRemoteLogStorageSystemEnabled)
+      brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem())
 
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
s"broker-${config.nodeId}-")
 
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 1d5702e76e4..ed9260b2194 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -70,7 +70,7 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     val logs = logManager.logsByTopic(topic)
     val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
 
-    logManager.updateTopicConfig(topic, props, 
kafkaConfig.isRemoteLogStorageSystemEnabled)
+    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
     maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
   }
 
diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 15eb1eff04a..f957b65ddd1 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -107,7 +107,8 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
           throw new InvalidConfigurationException("Null value not supported 
for topic configs: " +
             nullTopicConfigs.mkString(","))
         }
-        LogConfig.validate(properties, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled)
+        LogConfig.validate(properties, kafkaConfig.extractLogConfigMap,
+          kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
       case BROKER => validateBrokerName(resource.name())
       case CLIENT_METRICS =>
         val properties = new Properties()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b812f299ecb..3635d5a3edd 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import java.{lang, util}
+import java.util
 import java.util.concurrent.TimeUnit
 import java.util.{Collections, Properties}
 import kafka.cluster.EndPoint
@@ -1207,8 +1207,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def usesTopicId: Boolean =
     usesSelfManagedQuorum || interBrokerProtocolVersion.isTopicIdsSupported
 
-
-  val isRemoteLogStorageSystemEnabled: lang.Boolean = 
getBoolean(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP)
   def logLocalRetentionBytes: java.lang.Long = 
getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP)
 
   def logLocalRetentionMs: java.lang.Long = 
getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index b913b72fc9c..9c807c79da5 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -276,7 +276,7 @@ class KafkaServer(
         createCurrentControllerIdMetric()
 
         /* register broker metrics */
-        _brokerTopicStats = new 
BrokerTopicStats(config.isRemoteLogStorageSystemEnabled)
+        _brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem())
 
         quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
threadNamePrefix.getOrElse(""))
         KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++ 
metrics.reporters.asScala)
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index efecfe854bb..604e03c7ed4 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -163,7 +163,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
 
     LogConfig.validate(config,
       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
-      kafkaConfig.exists(_.isRemoteLogStorageSystemEnabled))
+      kafkaConfig.exists(_.remoteLogManagerConfig.enableRemoteStorageSystem()))
   }
 
   private def writeTopicPartitionAssignment(topic: String, replicaAssignment: 
Map[Int, ReplicaAssignment],
@@ -481,7 +481,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
     // remove the topic overrides
     LogConfig.validate(configs,
       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
-      kafkaConfig.exists(_.isRemoteLogStorageSystemEnabled))
+      kafkaConfig.exists(_.remoteLogManagerConfig.enableRemoteStorageSystem()))
   }
 
   /**
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 0ba5d63a8da..50b581fdf4e 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -222,7 +222,7 @@ public class RemoteLogManagerTest {
         
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
         
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "100");
         remoteLogManagerConfig = createRLMConfig(props);
-        brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).isRemoteLogStorageSystemEnabled());
+        brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().enableRemoteStorageSystem());
 
         remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, 
brokerId, logDir, clusterId, time,
                 tp -> Optional.of(mockLog),
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 7be9bcd0267..2bf147b528b 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -298,7 +298,7 @@ class LogConfigTest {
     props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, 
localRetentionMs.toString)
     props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes.toString)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled))
+      () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
   }
 
   @Test
@@ -310,17 +310,17 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
-    LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled)
+    LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
 
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled))
+      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled))
+      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled))
+      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
   }
 
   @ParameterizedTest(name = "testEnableRemoteLogStorage with 
sysRemoteStorageEnabled: {0}")
@@ -333,10 +333,10 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     if (sysRemoteStorageEnabled) {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled)
+      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
     } else {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled))
+        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
       assertTrue(message.getMessage.contains("Tiered Storage functionality is 
disabled in the broker"))
     }
   }
@@ -356,10 +356,10 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled))
+        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
     } else {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled)
+      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
     }
   }
 
@@ -378,10 +378,10 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled))
+        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled)
+      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
     }
   }
 
@@ -396,10 +396,10 @@ class LogConfigTest {
 
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => 
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled))
+        () => 
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
-      LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.isRemoteLogStorageSystemEnabled)
+      LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9f820155c8e..65cbcc7bd70 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4095,7 +4095,7 @@ class ReplicaManagerTest {
     val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
     val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
     val mockLog = mock(classOf[UnifiedLog])
-    val brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).isRemoteLogStorageSystemEnabled)
+    val brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem())
     val remoteLogManager = new RemoteLogManager(
       remoteLogManagerConfig,
       0,
@@ -4195,7 +4195,7 @@ class ReplicaManagerTest {
     val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
     val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
     val dummyLog = mock(classOf[UnifiedLog])
-    val brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).isRemoteLogStorageSystemEnabled)
+    val brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem())
     val remoteLogManager = new RemoteLogManager(
       remoteLogManagerConfig,
       0,

Reply via email to