This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 781b93b00d99a63dee0ed6650a2d398f7791cea3 Author: Chia Chuan Yu <yujuan...@gmail.com> AuthorDate: Mon Jun 10 02:14:15 2024 +0800 KAFKA-16885 Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled (#16256) Reviewers: Kamal Chandraprakash <kchandraprak...@uber.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../java/kafka/log/remote/RemoteLogManager.java | 2 +- .../kafka/server/builders/KafkaApisBuilder.java | 2 +- .../server/builders/ReplicaManagerBuilder.java | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 4 ++-- .../src/main/scala/kafka/server/BrokerServer.scala | 4 ++-- .../main/scala/kafka/server/ConfigHandler.scala | 2 +- .../server/ControllerConfigurationValidator.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 4 ++-- core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 ++-- .../kafka/log/remote/RemoteLogManagerTest.java | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 26 +++++++++++----------- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../unit/kafka/server/ReplicaManagerTest.scala | 6 ++--- .../log/remote/storage/RemoteLogManagerConfig.java | 2 +- 14 files changed, 32 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 5b0d91ff439..de5fbd85f16 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -378,7 +378,7 @@ public class RemoteLogManager implements Closeable { Map<String, Uuid> topicIds) { LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); - if (this.rlmConfig.enableRemoteStorageSystem() && !isRemoteLogManagerConfigured()) { + if (this.rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 1d422461678..56607723604 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -179,7 +179,7 @@ public class KafkaApisBuilder { if (metrics == null) throw new RuntimeException("You must set metrics"); if (quotas == null) throw new RuntimeException("You must set quotas"); if (fetchManager == null) throw new RuntimeException("You must set fetchManager"); - if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); + if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager"); return new KafkaApis(requestChannel, diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 5e8cf2dcdc6..7cac33200d2 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -185,7 +185,7 @@ public class ReplicaManagerBuilder { if (metadataCache == null) throw new RuntimeException("You must set metadataCache"); if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel"); if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager"); - if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); + if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); // Initialize metrics in the end just before passing it to ReplicaManager to ensure ReplicaManager closes the // metrics correctly. There might be a resource leak if it is initialized and an exception occurs between // its initialization and creation of ReplicaManager. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 45d6ab7908d..8908aadf459 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1562,7 +1562,7 @@ object LogManager { keepPartitionMetadataFile: Boolean): LogManager = { val defaultProps = config.extractLogConfigMap - LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.enableRemoteStorageSystem()) + LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val defaultLogConfig = new LogConfig(defaultProps) val cleanerConfig = LogCleaner.cleanerConfig(config) @@ -1586,7 +1586,7 @@ object LogManager { time = time, keepPartitionMetadataFile = keepPartitionMetadataFile, interBrokerProtocolVersion = config.interBrokerProtocolVersion, - remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem(), + remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), initialTaskDelayMs = config.logInitialTaskDelayMs) } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index baafc3e658f..e143dd26668 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -186,7 +186,7 @@ class BrokerServer( kafkaScheduler.startup() /* register broker metrics */ - brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem()) + brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-") @@ -621,7 +621,7 @@ class BrokerServer( } protected def createRemoteLogManager(): Option[RemoteLogManager] = { - if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) { + if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index ed9260b2194..e441b30a3bd 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -70,7 +70,7 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) - logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) + logManager.updateTopicConfig(topic, props, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) } diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index f957b65ddd1..b99065b573e 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -108,7 +108,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu nullTopicConfigs.mkString(",")) } LogConfig.validate(properties, kafkaConfig.extractLogConfigMap, - kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) + kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 9c807c79da5..5b6e04e5a0e 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -276,7 +276,7 @@ class KafkaServer( createCurrentControllerIdMetric() /* register broker metrics */ - _brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem()) + _brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) KafkaBroker.notifyClusterListeners(clusterId, kafkaMetricsReporters ++ metrics.reporters.asScala) @@ -690,7 +690,7 @@ class KafkaServer( } protected def createRemoteLogManager(): Option[RemoteLogManager] = { - if (config.remoteLogManagerConfig.enableRemoteStorageSystem()) { + if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 604e03c7ed4..cd9153c07dc 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -163,7 +163,7 @@ class AdminZkClient(zkClient: KafkaZkClient, LogConfig.validate(config, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), - kafkaConfig.exists(_.remoteLogManagerConfig.enableRemoteStorageSystem())) + kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment], @@ -481,7 +481,7 @@ class AdminZkClient(zkClient: KafkaZkClient, // remove the topic overrides LogConfig.validate(configs, kafkaConfig.map(_.extractLogConfigMap).getOrElse(Collections.emptyMap()), - kafkaConfig.exists(_.remoteLogManagerConfig.enableRemoteStorageSystem())) + kafkaConfig.exists(_.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } /** diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 9badaabf419..edc85cdf28b 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -221,7 +221,7 @@ public class RemoteLogManagerTest { props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); remoteLogManagerConfig = createRLMConfig(props); - brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().enableRemoteStorageSystem()); + brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 2bf147b528b..bd1ee39a483 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -298,7 +298,7 @@ class LogConfigTest { props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString) props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString) assertThrows(classOf[ConfigException], - () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) + () => LogConfig.validate(props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } @Test @@ -310,17 +310,17 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact") assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete") assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) } @ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}") @@ -333,10 +333,10 @@ class LogConfigTest { val logProps = new Properties() logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") if (sysRemoteStorageEnabled) { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } else { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) } } @@ -356,10 +356,10 @@ class LogConfigTest { logProps.put(TopicConfig.RETENTION_MS_CONFIG, "500") if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG)) } else { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } } @@ -378,10 +378,10 @@ class LogConfigTest { logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, "128") if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) + () => LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) } else { - LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) + LogConfig.validate(logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } } @@ -396,10 +396,10 @@ class LogConfigTest { if (sysRemoteStorageEnabled) { val message = assertThrows(classOf[ConfigException], - () => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem())) + () => LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())) assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG)) } else { - LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.enableRemoteStorageSystem()) + LogConfig.validateBrokerLogConfigValues(kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) } } diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index 1a781a93ea6..53e4229a276 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -128,7 +128,7 @@ class LogLoaderTest { logDirFailureChannel = logDirFailureChannel, time = time, keepPartitionMetadataFile = config.usesTopicId, - remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem(), + remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), initialTaskDelayMs = config.logInitialTaskDelayMs) { override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long], diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 313b679c605..a6d0d1f3293 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -3389,7 +3389,7 @@ class ReplicaManagerTest { when(mockLog.remoteLogEnabled()).thenReturn(enableRemoteStorage) when(mockLog.remoteStorageSystemEnable).thenReturn(enableRemoteStorage) val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId)) - brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem) + brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled) val metadataCache: MetadataCache = mock(classOf[MetadataCache]) when(metadataCache.topicIdInfo()).thenReturn((topicIds.asJava, topicNames.asJava)) @@ -4094,7 +4094,7 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString) val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val mockLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem()) + val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( remoteLogManagerConfig, 0, @@ -4193,7 +4193,7 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val dummyLog = mock(classOf[UnifiedLog]) - val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.enableRemoteStorageSystem()) + val brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( remoteLogManagerConfig, 0, diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 8224ab23825..6323ba65bfe 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -358,7 +358,7 @@ public final class RemoteLogManagerConfig extends AbstractConfig { super(configDef(), props); } - public boolean enableRemoteStorageSystem() { + public boolean isRemoteStorageSystemEnabled() { return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); }