This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 781b93b00d99a63dee0ed6650a2d398f7791cea3
Author: Chia Chuan Yu <yujuan...@gmail.com>
AuthorDate: Mon Jun 10 02:14:15 2024 +0800

    KAFKA-16885 Renamed the enableRemoteStorageSystem to 
isRemoteStorageSystemEnabled (#16256)
    
    Reviewers: Kamal Chandraprakash <kchandraprak...@uber.com>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  2 +-
 .../kafka/server/builders/KafkaApisBuilder.java    |  2 +-
 .../server/builders/ReplicaManagerBuilder.java     |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  4 ++--
 .../src/main/scala/kafka/server/BrokerServer.scala |  4 ++--
 .../main/scala/kafka/server/ConfigHandler.scala    |  2 +-
 .../server/ControllerConfigurationValidator.scala  |  2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  4 ++--
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  4 ++--
 .../kafka/log/remote/RemoteLogManagerTest.java     |  2 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 26 +++++++++++-----------
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  6 ++---
 .../log/remote/storage/RemoteLogManagerConfig.java |  2 +-
 14 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5b0d91ff439..de5fbd85f16 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -378,7 +378,7 @@ public class RemoteLogManager implements Closeable {
                                    Map<String, Uuid> topicIds) {
         LOGGER.debug("Received leadership changes for leaders: {} and 
followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
 
-        if (this.rlmConfig.enableRemoteStorageSystem() && 
!isRemoteLogManagerConfigured()) {
+        if (this.rlmConfig.isRemoteStorageSystemEnabled() && 
!isRemoteLogManagerConfigured()) {
             throw new KafkaException("RemoteLogManager is not configured when 
remote storage system is enabled");
         }
 
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 1d422461678..56607723604 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -179,7 +179,7 @@ public class KafkaApisBuilder {
         if (metrics == null) throw new RuntimeException("You must set 
metrics");
         if (quotas == null) throw new RuntimeException("You must set quotas");
         if (fetchManager == null) throw new RuntimeException("You must set 
fetchManager");
-        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());
+        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
         if (apiVersionManager == null) throw new RuntimeException("You must 
set apiVersionManager");
 
         return new KafkaApis(requestChannel,
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 5e8cf2dcdc6..7cac33200d2 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -185,7 +185,7 @@ public class ReplicaManagerBuilder {
         if (metadataCache == null) throw new RuntimeException("You must set 
metadataCache");
         if (logDirFailureChannel == null) throw new RuntimeException("You must 
set logDirFailureChannel");
         if (alterPartitionManager == null) throw new RuntimeException("You 
must set alterIsrManager");
-        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());
+        if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
         // Initialize metrics in the end just before passing it to 
ReplicaManager to ensure ReplicaManager closes the
         // metrics correctly. There might be a resource leak if it is 
initialized and an exception occurs between
         // its initialization and creation of ReplicaManager.
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 45d6ab7908d..8908aadf459 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1562,7 +1562,7 @@ object LogManager {
             keepPartitionMetadataFile: Boolean): LogManager = {
     val defaultProps = config.extractLogConfigMap
 
-    LogConfig.validateBrokerLogConfigValues(defaultProps, 
config.remoteLogManagerConfig.enableRemoteStorageSystem())
+    LogConfig.validateBrokerLogConfigValues(defaultProps, 
config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     val defaultLogConfig = new LogConfig(defaultProps)
 
     val cleanerConfig = LogCleaner.cleanerConfig(config)
@@ -1586,7 +1586,7 @@ object LogManager {
       time = time,
       keepPartitionMetadataFile = keepPartitionMetadataFile,
       interBrokerProtocolVersion = config.interBrokerProtocolVersion,
-      remoteStorageSystemEnable = 
config.remoteLogManagerConfig.enableRemoteStorageSystem(),
+      remoteStorageSystemEnable = 
config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
       initialTaskDelayMs = config.logInitialTaskDelayMs)
   }
 
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index baafc3e658f..e143dd26668 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -186,7 +186,7 @@ class BrokerServer(
       kafkaScheduler.startup()
 
       /* register broker metrics */
-      brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem())
+      brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
 
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
s"broker-${config.nodeId}-")
 
@@ -621,7 +621,7 @@ class BrokerServer(
   }
 
   protected def createRemoteLogManager(): Option[RemoteLogManager] = {
-    if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
+    if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
       Some(new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.head, clusterId, time,
         (tp: TopicPartition) => logManager.getLog(tp).asJava,
         (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index ed9260b2194..e441b30a3bd 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -70,7 +70,7 @@ class TopicConfigHandler(private val replicaManager: 
ReplicaManager,
     val logs = logManager.logsByTopic(topic)
     val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
 
-    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
+    logManager.updateTopicConfig(topic, props, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     maybeBootstrapRemoteLogComponents(topic, logs, 
wasRemoteLogEnabledBeforeUpdate)
   }
 
diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index f957b65ddd1..b99065b573e 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -108,7 +108,7 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
             nullTopicConfigs.mkString(","))
         }
         LogConfig.validate(properties, kafkaConfig.extractLogConfigMap,
-          kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
+          kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
       case BROKER => validateBrokerName(resource.name())
       case CLIENT_METRICS =>
         val properties = new Properties()
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9c807c79da5..5b6e04e5a0e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -276,7 +276,7 @@ class KafkaServer(
         createCurrentControllerIdMetric()
 
         /* register broker metrics */
-        _brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem())
+        _brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
 
         quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
threadNamePrefix.getOrElse(""))
         KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++ 
metrics.reporters.asScala)
@@ -690,7 +690,7 @@ class KafkaServer(
   }
 
   protected def createRemoteLogManager(): Option[RemoteLogManager] = {
-    if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
+    if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
       Some(new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.head, clusterId, time,
         (tp: TopicPartition) => logManager.getLog(tp).asJava,
         (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala 
b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index 604e03c7ed4..cd9153c07dc 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -163,7 +163,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
 
     LogConfig.validate(config,
       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
-      kafkaConfig.exists(_.remoteLogManagerConfig.enableRemoteStorageSystem()))
+      
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
 
   private def writeTopicPartitionAssignment(topic: String, replicaAssignment: 
Map[Int, ReplicaAssignment],
@@ -481,7 +481,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
     // remove the topic overrides
     LogConfig.validate(configs,
       kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()),
-      kafkaConfig.exists(_.remoteLogManagerConfig.enableRemoteStorageSystem()))
+      
kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
 
   /**
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 9badaabf419..edc85cdf28b 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -221,7 +221,7 @@ public class RemoteLogManagerTest {
         
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
         
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "100");
         remoteLogManagerConfig = createRLMConfig(props);
-        brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().enableRemoteStorageSystem());
+        brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
 
         remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, 
brokerId, logDir, clusterId, time,
                 tp -> Optional.of(mockLog),
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 2bf147b528b..bd1ee39a483 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -298,7 +298,7 @@ class LogConfigTest {
     props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, 
localRetentionMs.toString)
     props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, 
localRetentionBytes.toString)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
+      () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
 
   @Test
@@ -310,17 +310,17 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE)
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
-    LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
+    LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
 
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
+      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
+      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
     logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
     assertThrows(classOf[ConfigException],
-      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
+      () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
   }
 
   @ParameterizedTest(name = "testEnableRemoteLogStorage with 
sysRemoteStorageEnabled: {0}")
@@ -333,10 +333,10 @@ class LogConfigTest {
     val logProps = new Properties()
     logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
     if (sysRemoteStorageEnabled) {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
+      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     } else {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
+        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       assertTrue(message.getMessage.contains("Tiered Storage functionality is 
disabled in the broker"))
     }
   }
@@ -356,10 +356,10 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
+        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
     } else {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
+      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     }
   }
 
@@ -378,10 +378,10 @@ class LogConfigTest {
     logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128")
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
+        () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
-      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
+      LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     }
   }
 
@@ -396,10 +396,10 @@ class LogConfigTest {
 
     if (sysRemoteStorageEnabled) {
       val message = assertThrows(classOf[ConfigException],
-        () => 
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()))
+        () => 
LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
       
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
     } else {
-      LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())
+      LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, 
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 1a781a93ea6..53e4229a276 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -128,7 +128,7 @@ class LogLoaderTest {
         logDirFailureChannel = logDirFailureChannel,
         time = time,
         keepPartitionMetadataFile = config.usesTopicId,
-        remoteStorageSystemEnable = 
config.remoteLogManagerConfig.enableRemoteStorageSystem(),
+        remoteStorageSystemEnable = 
config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(),
         initialTaskDelayMs = config.logInitialTaskDelayMs) {
 
         override def loadLog(logDir: File, hadCleanShutdown: Boolean, 
recoveryPoints: Map[TopicPartition, Long],
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 313b679c605..a6d0d1f3293 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3389,7 +3389,7 @@ class ReplicaManagerTest {
     when(mockLog.remoteLogEnabled()).thenReturn(enableRemoteStorage)
     when(mockLog.remoteStorageSystemEnable).thenReturn(enableRemoteStorage)
     val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, 
s"host$brokerId", brokerId))
-    brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem)
+    brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled)
 
     val metadataCache: MetadataCache = mock(classOf[MetadataCache])
     when(metadataCache.topicIdInfo()).thenReturn((topicIds.asJava, 
topicNames.asJava))
@@ -4094,7 +4094,7 @@ class ReplicaManagerTest {
     props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 
2.toString)
     val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
     val mockLog = mock(classOf[UnifiedLog])
-    val brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem())
+    val brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     val remoteLogManager = new RemoteLogManager(
       remoteLogManagerConfig,
       0,
@@ -4193,7 +4193,7 @@ class ReplicaManagerTest {
     
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteLogMetadataManager].getName)
     val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
     val dummyLog = mock(classOf[UnifiedLog])
-    val brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem())
+    val brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     val remoteLogManager = new RemoteLogManager(
       remoteLogManagerConfig,
       0,
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 8224ab23825..6323ba65bfe 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -358,7 +358,7 @@ public final class RemoteLogManagerConfig extends 
AbstractConfig {
         super(configDef(), props);
     }
 
-    public boolean enableRemoteStorageSystem() {
+    public boolean isRemoteStorageSystemEnabled() {
         return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
     }
 

Reply via email to