This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4fe08f3b297 KAFKA-16976 Update the current/dynamic config inside 
RemoteLogManagerConfig (#16394)
4fe08f3b297 is described below

commit 4fe08f3b297064794d8ff0c38d537f2dd32de1e8
Author: Kamal Chandraprakash <kchandraprak...@uber.com>
AuthorDate: Thu Jun 20 21:03:35 2024 +0530

    KAFKA-16976 Update the current/dynamic config inside RemoteLogManagerConfig 
(#16394)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 35 ++++------
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +--
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 56 ++++++++--------
 .../kafka/server/DynamicBrokerConfigTest.scala     | 50 ++++++++-------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  4 +-
 .../log/remote/storage/RemoteLogManagerConfig.java | 74 ++++++++++++++--------
 .../remote/storage/RemoteLogManagerConfigTest.java | 33 ++++++----
 10 files changed, 144 insertions(+), 124 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 2b8f36ba501..e51e3e4347e 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -22,7 +22,6 @@ import kafka.log.UnifiedLog;
 import kafka.log.remote.quota.RLMQuotaManager;
 import kafka.log.remote.quota.RLMQuotaManagerConfig;
 import kafka.server.BrokerTopicStats;
-import kafka.server.KafkaConfig;
 import kafka.server.QuotaType;
 import kafka.server.StopPartition;
 
@@ -155,7 +154,7 @@ public class RemoteLogManager implements Closeable {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
     private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
-    private final KafkaConfig config;
+    private final RemoteLogManagerConfig rlmConfig;
     private final int brokerId;
     private final String logDir;
     private final Time time;
@@ -196,7 +195,7 @@ public class RemoteLogManager implements Closeable {
     /**
      * Creates RemoteLogManager instance with the given arguments.
      *
-     * @param config Configuration required for remote logging 
subsystem(tiered storage) at the broker level.
+     * @param rlmConfig Configuration required for remote logging 
subsystem(tiered storage) at the broker level.
      * @param brokerId  id of the current broker.
      * @param logDir    directory of Kafka log segments.
      * @param time      Time instance.
@@ -206,7 +205,7 @@ public class RemoteLogManager implements Closeable {
      * @param brokerTopicStats BrokerTopicStats instance to update the 
respective metrics.
      * @param metrics  Metrics instance
      */
-    public RemoteLogManager(KafkaConfig config,
+    public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
                             int brokerId,
                             String logDir,
                             String clusterId,
@@ -215,7 +214,7 @@ public class RemoteLogManager implements Closeable {
                             BiConsumer<TopicPartition, Long> 
updateRemoteLogStartOffset,
                             BrokerTopicStats brokerTopicStats,
                             Metrics metrics) throws IOException {
-        this.config = config;
+        this.rlmConfig = rlmConfig;
         this.brokerId = brokerId;
         this.logDir = logDir;
         this.clusterId = clusterId;
@@ -230,8 +229,7 @@ public class RemoteLogManager implements Closeable {
         rlmCopyQuotaManager = createRLMCopyQuotaManager();
         rlmFetchQuotaManager = createRLMFetchQuotaManager();
 
-        RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
-        indexCache = new 
RemoteIndexCache(config.remoteLogIndexFileCacheTotalSizeBytes(), 
remoteLogStorageManager, logDir);
+        indexCache = new 
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), 
remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
 
@@ -279,12 +277,12 @@ public class RemoteLogManager implements Closeable {
     }
 
     RLMQuotaManager createRLMCopyQuotaManager() {
-        return new RLMQuotaManager(copyQuotaManagerConfig(config), metrics, 
QuotaType.RLMCopy$.MODULE$,
+        return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, 
QuotaType.RLMCopy$.MODULE$,
           "Tracking copy byte-rate for Remote Log Manager", time);
     }
 
     RLMQuotaManager createRLMFetchQuotaManager() {
-        return new RLMQuotaManager(fetchQuotaManagerConfig(config), metrics, 
QuotaType.RLMFetch$.MODULE$,
+        return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), 
metrics, QuotaType.RLMFetch$.MODULE$,
           "Tracking fetch byte-rate for Remote Log Manager", time);
     }
 
@@ -292,16 +290,14 @@ public class RemoteLogManager implements Closeable {
         return rlmFetchQuotaManager.isQuotaExceeded();
     }
 
-    static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig config) {
-        RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
-        return new 
RLMQuotaManagerConfig(config.remoteLogManagerCopyMaxBytesPerSecond(),
+    static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig 
rlmConfig) {
+        return new 
RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(),
           rlmConfig.remoteLogManagerCopyNumQuotaSamples(),
           rlmConfig.remoteLogManagerCopyQuotaWindowSizeSeconds());
     }
 
-    static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig config) {
-        RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
-        return new 
RLMQuotaManagerConfig(config.remoteLogManagerFetchMaxBytesPerSecond(),
+    static RLMQuotaManagerConfig 
fetchQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
+        return new 
RLMQuotaManagerConfig(rlmConfig.remoteLogManagerFetchMaxBytesPerSecond(),
           rlmConfig.remoteLogManagerFetchNumQuotaSamples(),
           rlmConfig.remoteLogManagerFetchQuotaWindowSizeSeconds());
     }
@@ -318,7 +314,6 @@ public class RemoteLogManager implements Closeable {
 
     @SuppressWarnings("removal")
     RemoteStorageManager createRemoteStorageManager() {
-        RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
         return java.security.AccessController.doPrivileged(new 
PrivilegedAction<RemoteStorageManager>() {
             private final String classPath = 
rlmConfig.remoteStorageManagerClassPath();
 
@@ -335,14 +330,13 @@ public class RemoteLogManager implements Closeable {
     }
 
     private void configureRSM() {
-        final Map<String, Object> rsmProps = new 
HashMap<>(config.remoteLogManagerConfig().remoteStorageManagerProps());
+        final Map<String, Object> rsmProps = new 
HashMap<>(rlmConfig.remoteStorageManagerProps());
         rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
         remoteLogStorageManager.configure(rsmProps);
     }
 
     @SuppressWarnings("removal")
     RemoteLogMetadataManager createRemoteLogMetadataManager() {
-        RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
         return java.security.AccessController.doPrivileged(new 
PrivilegedAction<RemoteLogMetadataManager>() {
             private final String classPath = 
rlmConfig.remoteLogMetadataManagerClassPath();
 
@@ -369,7 +363,7 @@ public class RemoteLogManager implements Closeable {
             rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"security.protocol", e.securityProtocol().name);
         });
         // update the remoteLogMetadataProps here to override endpoint config 
if any
-        
rlmmProps.putAll(config.remoteLogManagerConfig().remoteLogMetadataManagerProps());
+        rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());
 
         rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId);
         rlmmProps.put(LOG_DIR_CONFIG, logDir);
@@ -421,7 +415,7 @@ public class RemoteLogManager implements Closeable {
                                    Map<String, Uuid> topicIds) {
         LOGGER.debug("Received leadership changes for leaders: {} and 
followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
 
-        if (config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && 
!isRemoteLogManagerConfigured()) {
+        if (rlmConfig.isRemoteStorageSystemEnabled() && 
!isRemoteLogManagerConfigured()) {
             throw new KafkaException("RemoteLogManager is not configured when 
remote storage system is enabled");
         }
 
@@ -1751,7 +1745,6 @@ public class RemoteLogManager implements Closeable {
 
     void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
                                             Consumer<RLMTask> 
convertToLeaderOrFollower) {
-        RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
         RLMTaskWithFuture rlmTaskWithFuture = 
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
                 topicIdPartition -> {
                     RLMTask task = new RLMTask(topicIdPartition, 
rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 723e6b01b61..29f3a5cc0b0 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -621,7 +621,7 @@ class BrokerServer(
 
   protected def createRemoteLogManager(): Option[RemoteLogManager] = {
     if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
-      Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, 
clusterId, time,
+      Some(new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.head, clusterId, time,
         (tp: TopicPartition) => logManager.getLog(tp).asJava,
         (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
           logManager.getLog(tp).foreach { log =>
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a7513c4ec87..7e84514c1e7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -231,7 +231,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val zkEnableSecureAcls: Boolean = 
getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
   val zkMaxInFlightRequests: Int = 
getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)
 
-  private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props)
+  private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
   def remoteLogManagerConfig = _remoteLogManagerConfig
 
   private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: 
String): Boolean = {
@@ -869,14 +869,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 
   def logLocalRetentionMs: java.lang.Long = 
getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP)
 
-  def remoteFetchMaxWaitMs = 
getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP)
-
-  def remoteLogIndexFileCacheTotalSizeBytes: Long = 
getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
-
-  def remoteLogManagerCopyMaxBytesPerSecond: Long = 
getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP)
-
-  def remoteLogManagerFetchMaxBytesPerSecond: Long = 
getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP)
-
   validateValues()
 
   @nowarn("cat=deprecation")
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index df9d631dca5..23b7e8ebc3d 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -691,7 +691,7 @@ class KafkaServer(
 
   protected def createRemoteLogManager(): Option[RemoteLogManager] = {
     if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
-      Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, 
clusterId, time,
+      Some(new RemoteLogManager(config.remoteLogManagerConfig, 
config.brokerId, config.logDirs.head, clusterId, time,
         (tp: TopicPartition) => logManager.getLog(tp).asJava,
         (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
           logManager.getLog(tp).foreach { log =>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6ab67a84ec4..3615788aeab 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1479,7 +1479,7 @@ class ReplicaManager(val config: KafkaConfig,
         return Some(createLogReadResult(e))
     }
 
-    val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong
+    val remoteFetchMaxWaitMs = 
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
     val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
       fetchPartitionStatus, params, logReadResults, this, responseCallback)
     delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 7e3eb2dad6f..4f2405a94c7 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -226,11 +226,11 @@ public class RemoteLogManagerTest {
         Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
         
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
         
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
 "100");
-        createRLMConfig(props);
+        appendRLMConfig(props);
         config = KafkaConfig.fromProps(props);
         brokerTopicStats = new 
BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled());
 
-        remoteLogManager = new RemoteLogManager(config, brokerId, logDir, 
clusterId, time,
+        remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, 
time,
                 tp -> Optional.of(mockLog),
                 (topicPartition, offset) -> currentLogStartOffset.set(offset),
                 brokerTopicStats, metrics) {
@@ -338,11 +338,14 @@ public class RemoteLogManagerTest {
         String key = "key";
         String configPrefix = "config.prefix";
         Properties props = new Properties();
+        props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
 configPrefix);
         props.put(configPrefix + key, "world");
         props.put("remote.log.metadata.y", "z");
+        appendRLMConfig(props);
+        KafkaConfig config = KafkaConfig.fromProps(props);
 
-        Map<String, Object> metadataMangerConfig = 
createRLMConfig(props).remoteLogMetadataManagerProps();
+        Map<String, Object> metadataMangerConfig = 
config.remoteLogManagerConfig().remoteLogMetadataManagerProps();
         assertEquals(props.get(configPrefix + key), 
metadataMangerConfig.get(key));
         assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"));
     }
@@ -352,11 +355,14 @@ public class RemoteLogManagerTest {
         String key = "key";
         String configPrefix = "config.prefix";
         Properties props = new Properties();
+        props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
         
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, 
configPrefix);
         props.put(configPrefix + key, "world");
         props.put("remote.storage.manager.y", "z");
+        appendRLMConfig(props);
+        KafkaConfig config = KafkaConfig.fromProps(props);
 
-        Map<String, Object> remoteStorageManagerConfig = 
createRLMConfig(props).remoteStorageManagerProps();
+        Map<String, Object> remoteStorageManagerConfig = 
config.remoteLogManagerConfig().remoteStorageManagerProps();
         assertEquals(props.get(configPrefix + key), 
remoteStorageManagerConfig.get(key));
         
assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y"));
     }
@@ -385,10 +391,10 @@ public class RemoteLogManagerTest {
         props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect());
         // override common security.protocol by adding "RLMM prefix" and 
"remote log metadata common client prefix"
         props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
-        createRLMConfig(props);
+        appendRLMConfig(props);
         KafkaConfig config = KafkaConfig.fromProps(props);
         try (RemoteLogManager remoteLogManager = new RemoteLogManager(
-                config,
+                config.remoteLogManagerConfig(),
                 brokerId,
                 logDir,
                 clusterId,
@@ -1289,7 +1295,7 @@ public class RemoteLogManagerTest {
     void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
         ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
         try (RemoteLogManager remoteLogManager =
-            new RemoteLogManager(config, brokerId, logDir, clusterId, time,
+            new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, 
logDir, clusterId, time,
                     t -> Optional.empty(),
                     (topicPartition, offset) -> { },
                     brokerTopicStats, metrics) {
@@ -1551,7 +1557,7 @@ public class RemoteLogManagerTest {
     public void testRemoveMetricsOnClose() throws IOException {
         MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = 
mockConstruction(KafkaMetricsGroup.class);
         try {
-            RemoteLogManager remoteLogManager = new RemoteLogManager(config, 
brokerId, logDir, clusterId,
+            RemoteLogManager remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId,
                 time, tp -> Optional.of(mockLog), (topicPartition, offset) -> 
{ }, brokerTopicStats, metrics) {
                 public RemoteStorageManager createRemoteStorageManager() {
                     return remoteStorageManager;
@@ -1946,7 +1952,7 @@ public class RemoteLogManagerTest {
                     else
                         return Collections.emptyIterator();
                 });
-        try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, 
brokerId, logDir, clusterId, time,
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, 
time,
                 tp -> Optional.of(mockLog),
                 (topicPartition, offset) -> { },
                 brokerTopicStats, metrics) {
@@ -1971,7 +1977,7 @@ public class RemoteLogManagerTest {
         
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), 
anyInt()))
                 .thenReturn(Collections.emptyIterator());
 
-        try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, 
brokerId, logDir, clusterId, time,
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, 
time,
                 tp -> Optional.of(mockLog),
                 (topicPartition, offset) -> { },
                 brokerTopicStats, metrics) {
@@ -2005,7 +2011,7 @@ public class RemoteLogManagerTest {
                 });
 
         AtomicLong logStartOffset = new AtomicLong(0);
-        try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, 
brokerId, logDir, clusterId, time,
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, 
time,
                 tp -> Optional.of(mockLog),
                 (topicPartition, offset) ->  logStartOffset.set(offset),
                 brokerTopicStats, metrics) {
@@ -2437,7 +2443,7 @@ public class RemoteLogManagerTest {
     @Test
     public void testDeleteRetentionMsOnExpiredSegment() throws 
RemoteStorageException, IOException {
         AtomicLong logStartOffset = new AtomicLong(0);
-        try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, 
brokerId, logDir, clusterId, time,
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, 
time,
                 tp -> Optional.of(mockLog),
                 (topicPartition, offset) -> logStartOffset.set(offset),
                 brokerTopicStats, metrics) {
@@ -2579,7 +2585,7 @@ public class RemoteLogManagerTest {
         );
 
         try (RemoteLogManager remoteLogManager = new RemoteLogManager(
-                config,
+                config.remoteLogManagerConfig(),
                 brokerId,
                 logDir,
                 clusterId,
@@ -2652,7 +2658,7 @@ public class RemoteLogManagerTest {
         );
 
         try (RemoteLogManager remoteLogManager = new RemoteLogManager(
-                config,
+                config.remoteLogManagerConfig(),
                 brokerId,
                 logDir,
                 clusterId,
@@ -2737,7 +2743,7 @@ public class RemoteLogManagerTest {
 
 
         try (RemoteLogManager remoteLogManager = new RemoteLogManager(
-                config,
+                config.remoteLogManagerConfig(),
                 brokerId,
                 logDir,
                 clusterId,
@@ -2783,9 +2789,9 @@ public class RemoteLogManagerTest {
     public void testCopyQuotaManagerConfig() {
         Properties defaultProps = new Properties();
         defaultProps.put("zookeeper.connect", 
kafka.utils.TestUtils.MockZkConnect());
-        createRLMConfig(defaultProps);
+        appendRLMConfig(defaultProps);
         KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps);
-        RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+        RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig());
         assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, 
defaultConfig.quotaBytesPerSecond());
         assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, 
defaultConfig.numQuotaSamples());
         
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, 
defaultConfig.quotaWindowSizeSeconds());
@@ -2795,10 +2801,10 @@ public class RemoteLogManagerTest {
         
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
         
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
         
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
-        createRLMConfig(customProps);
+        appendRLMConfig(customProps);
         KafkaConfig config = KafkaConfig.fromProps(customProps);
 
-        RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(config);
+        RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(config.remoteLogManagerConfig());
         assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
         assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
         assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
@@ -2808,10 +2814,10 @@ public class RemoteLogManagerTest {
     public void testFetchQuotaManagerConfig() {
         Properties defaultProps = new Properties();
         defaultProps.put("zookeeper.connect", 
kafka.utils.TestUtils.MockZkConnect());
-        createRLMConfig(defaultProps);
+        appendRLMConfig(defaultProps);
         KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps);
 
-        RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+        RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig());
         assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, 
defaultConfig.quotaBytesPerSecond());
         assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, 
defaultConfig.numQuotaSamples());
         
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, 
defaultConfig.quotaWindowSizeSeconds());
@@ -2821,9 +2827,9 @@ public class RemoteLogManagerTest {
         
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
         
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
         
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
-        createRLMConfig(customProps);
+        appendRLMConfig(customProps);
         KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps);
-        RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+        RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig.remoteLogManagerConfig());
         assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
         assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
         assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
@@ -3075,7 +3081,7 @@ public class RemoteLogManagerTest {
         return partition;
     }
 
-    private RemoteLogManagerConfig createRLMConfig(Properties props) {
+    private void appendRLMConfig(Properties props) {
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
         
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteStorageManager.class.getName());
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteLogMetadataManager.class.getName());
@@ -3086,8 +3092,6 @@ public class RemoteLogManagerTest {
         props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal);
         props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
         props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);
-
-        return new RemoteLogManagerConfig(props);
     }
 
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 8dad66919bc..a9d6eea9373 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -798,7 +798,8 @@ class DynamicBrokerConfigTest {
     val config = KafkaConfig(props)
     val kafkaBroker = mock(classOf[KafkaBroker])
     when(kafkaBroker.config).thenReturn(config)
-    assertEquals(500, config.remoteFetchMaxWaitMs)
+    when(kafkaBroker.remoteLogManagerOpt).thenReturn(None)
+    assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs)
 
     val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
     config.dynamicConfig.initialize(None, None)
@@ -809,13 +810,13 @@ class DynamicBrokerConfigTest {
     // update default config
     config.dynamicConfig.validate(newProps, perBrokerConfig = false)
     config.dynamicConfig.updateDefaultConfig(newProps)
-    assertEquals(30000, config.remoteFetchMaxWaitMs)
+    assertEquals(30000, config.remoteLogManagerConfig.remoteFetchMaxWaitMs)
 
     // update per broker config
     newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000")
     config.dynamicConfig.validate(newProps, perBrokerConfig = true)
     config.dynamicConfig.updateBrokerConfig(0, newProps)
-    assertEquals(10000, config.remoteFetchMaxWaitMs)
+    assertEquals(10000, config.remoteLogManagerConfig.remoteFetchMaxWaitMs)
 
     // invalid values
     for (maxWaitMs <- Seq(-1, 0)) {
@@ -832,10 +833,10 @@ class DynamicBrokerConfigTest {
 
     val config = KafkaConfig(origProps)
     val serverMock = Mockito.mock(classOf[KafkaBroker])
-    val remoteLogManagerMockOpt = 
Option(Mockito.mock(classOf[RemoteLogManager]))
+    val remoteLogManager = Mockito.mock(classOf[RemoteLogManager])
 
     Mockito.when(serverMock.config).thenReturn(config)
-    
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt)
+    
Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager))
 
     config.dynamicConfig.initialize(None, None)
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
@@ -844,10 +845,10 @@ class DynamicBrokerConfigTest {
 
     
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
 "4")
     config.dynamicConfig.updateDefaultConfig(props)
-    assertEquals(4L, 
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP))
-    Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4)
+    assertEquals(4L, 
config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes())
+    Mockito.verify(remoteLogManager).resizeCacheSize(4)
 
-    Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get)
+    Mockito.verifyNoMoreInteractions(remoteLogManager)
   }
 
   @Test
@@ -864,18 +865,18 @@ class DynamicBrokerConfigTest {
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
-      config.remoteLogManagerCopyMaxBytesPerSecond)
+      config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
 
     // Update default config
     
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 "100")
     config.dynamicConfig.updateDefaultConfig(props)
-    assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond)
+    assertEquals(100, 
config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
     verify(remoteLogManager).updateCopyQuota(100)
 
     // Update per broker config
     
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 "200")
     config.dynamicConfig.updateBrokerConfig(0, props)
-    assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond)
+    assertEquals(200, 
config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
     verify(remoteLogManager).updateCopyQuota(200)
 
     verifyNoMoreInteractions(remoteLogManager)
@@ -895,18 +896,18 @@ class DynamicBrokerConfigTest {
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
-      config.remoteLogManagerFetchMaxBytesPerSecond)
+      config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
 
     // Update default config
     
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 "100")
     config.dynamicConfig.updateDefaultConfig(props)
-    assertEquals(100, config.remoteLogManagerFetchMaxBytesPerSecond)
+    assertEquals(100, 
config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
     verify(remoteLogManager).updateFetchQuota(100)
 
     // Update per broker config
     
props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 "200")
     config.dynamicConfig.updateBrokerConfig(0, props)
-    assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond)
+    assertEquals(200, 
config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
     verify(remoteLogManager).updateFetchQuota(200)
 
     verifyNoMoreInteractions(remoteLogManager)
@@ -930,18 +931,21 @@ class DynamicBrokerConfigTest {
     config.dynamicConfig.addBrokerReconfigurable(new 
DynamicRemoteLogConfig(serverMock))
 
     // Default values
-    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
 config.remoteLogIndexFileCacheTotalSizeBytes)
-    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
 config.remoteLogManagerCopyMaxBytesPerSecond)
-    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
 config.remoteLogManagerFetchMaxBytesPerSecond)
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
+      config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes())
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND,
+      config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
+    
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND,
+      config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
 
     // Update default config
     props.put(indexFileCacheSizeProp, "4")
     props.put(copyQuotaProp, "100")
     props.put(fetchQuotaProp, "200")
     config.dynamicConfig.updateDefaultConfig(props)
-    assertEquals(4, config.remoteLogIndexFileCacheTotalSizeBytes)
-    assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond)
-    assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond)
+    assertEquals(4, 
config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes())
+    assertEquals(100, 
config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
+    assertEquals(200, 
config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
     verify(remoteLogManager).resizeCacheSize(4)
     verify(remoteLogManager).updateCopyQuota(100)
     verify(remoteLogManager).updateFetchQuota(200)
@@ -951,9 +955,9 @@ class DynamicBrokerConfigTest {
     props.put(copyQuotaProp, "200")
     props.put(fetchQuotaProp, "400")
     config.dynamicConfig.updateBrokerConfig(0, props)
-    assertEquals(8, config.remoteLogIndexFileCacheTotalSizeBytes)
-    assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond)
-    assertEquals(400, config.remoteLogManagerFetchMaxBytesPerSecond)
+    assertEquals(8, 
config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes())
+    assertEquals(200, 
config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond())
+    assertEquals(400, 
config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond())
     verify(remoteLogManager).resizeCacheSize(8)
     verify(remoteLogManager).updateCopyQuota(200)
     verify(remoteLogManager).updateFetchQuota(400)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 53b342e10db..a1f3ce002c2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4097,7 +4097,7 @@ class ReplicaManagerTest {
     val mockLog = mock(classOf[UnifiedLog])
     val brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     val remoteLogManager = new RemoteLogManager(
-      config,
+      config.remoteLogManagerConfig,
       0,
       TestUtils.tempRelativeDir("data").getAbsolutePath,
       "clusterId",
@@ -4203,7 +4203,7 @@ class ReplicaManagerTest {
     val dummyLog = mock(classOf[UnifiedLog])
     val brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
     val remoteLogManager = new RemoteLogManager(
-      config,
+      config.remoteLogManagerConfig,
       0,
       TestUtils.tempRelativeDir("data").getAbsolutePath,
       "clusterId",
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 480983edec4..fabb4f7c787 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -32,7 +32,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
 import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
 
-public final class RemoteLogManagerConfig extends AbstractConfig {
+public final class RemoteLogManagerConfig {
 
     /**
      * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
@@ -186,6 +186,8 @@ public final class RemoteLogManagerConfig extends 
AbstractConfig {
     public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum 
amount of time the server will wait before answering the remote fetch request";
     public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
 
+    private final AbstractConfig config;
+
     public static ConfigDef configDef() {
         return new ConfigDef()
                 .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
@@ -353,81 +355,81 @@ public final class RemoteLogManagerConfig extends 
AbstractConfig {
                         MEDIUM,
                         REMOTE_FETCH_MAX_WAIT_MS_DOC);
     }
-
-    public RemoteLogManagerConfig(Map<?, ?> props) {
-        super(configDef(), props);
+    
+    public RemoteLogManagerConfig(AbstractConfig config) {
+        this.config = config;
     }
 
     public boolean isRemoteStorageSystemEnabled() {
-        return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
+        return config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP);
     }
 
     public String remoteStorageManagerClassName() {
-        return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP);
+        return config.getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP);
     }
 
     public String remoteStorageManagerClassPath() {
-        return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP);
+        return config.getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP);
     }
 
     public String remoteLogMetadataManagerClassName() {
-        return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
+        return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
     }
 
     public String remoteLogMetadataManagerClassPath() {
-        return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP);
+        return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP);
     }
 
     public int remoteLogManagerThreadPoolSize() {
-        return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
+        return config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP);
     }
 
     public int remoteLogManagerCopierThreadPoolSize() {
-        return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
+        return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
     }
 
     public int remoteLogManagerExpirationThreadPoolSize() {
-        return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
+        return 
config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
     }
 
     public long remoteLogManagerTaskIntervalMs() {
-        return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP);
+        return config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP);
     }
 
     public long remoteLogManagerTaskRetryBackoffMs() {
-        return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP);
+        return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP);
     }
 
     public long remoteLogManagerTaskRetryBackoffMaxMs() {
-        return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP);
+        return 
config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP);
     }
 
     public double remoteLogManagerTaskRetryJitter() {
-        return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP);
+        return config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP);
     }
 
     public int remoteLogReaderThreads() {
-        return getInt(REMOTE_LOG_READER_THREADS_PROP);
+        return config.getInt(REMOTE_LOG_READER_THREADS_PROP);
     }
 
     public int remoteLogReaderMaxPendingTasks() {
-        return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP);
+        return config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP);
     }
 
     public String remoteLogMetadataManagerListenerName() {
-        return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP);
+        return 
config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP);
     }
 
     public int remoteLogMetadataCustomMetadataMaxBytes() {
-        return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP);
+        return 
config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP);
     }
 
     public String remoteStorageManagerPrefix() {
-        return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP);
+        return config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP);
     }
 
     public String remoteLogMetadataManagerPrefix() {
-        return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP);
+        return 
config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP);
     }
 
     public Map<String, Object> remoteStorageManagerProps() {
@@ -439,24 +441,40 @@ public final class RemoteLogManagerConfig extends 
AbstractConfig {
     }
 
     public Map<String, Object> getConfigProps(String configPrefixProp) {
-        String prefixProp = getString(configPrefixProp);
-        return prefixProp == null ? Collections.emptyMap() : 
Collections.unmodifiableMap(originalsWithPrefix(prefixProp));
+        String prefixProp = config.getString(configPrefixProp);
+        return prefixProp == null ? Collections.emptyMap() : 
Collections.unmodifiableMap(config.originalsWithPrefix(prefixProp));
     }
 
     public int remoteLogManagerCopyNumQuotaSamples() {
-        return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP);
+        return config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP);
     }
 
     public int remoteLogManagerCopyQuotaWindowSizeSeconds() {
-        return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP);
+        return 
config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP);
     }
 
     public int remoteLogManagerFetchNumQuotaSamples() {
-        return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP);
+        return config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP);
     }
 
     public int remoteLogManagerFetchQuotaWindowSizeSeconds() {
-        return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP);
+        return 
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP);
+    }
+
+    public long remoteLogIndexFileCacheTotalSizeBytes() {
+        return 
config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP);
+    }
+
+    public long remoteLogManagerCopyMaxBytesPerSecond() {
+        return 
config.getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP);
+    }
+
+    public long remoteLogManagerFetchMaxBytesPerSecond() {
+        return 
config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP);
+    }
+
+    public int remoteFetchMaxWaitMs() {
+        return config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP);
     }
 
     public static void main(String[] args) {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index 6904c8f3d4b..cb28f71a456 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.server.log.remote.storage;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigException;
 
 import org.junit.jupiter.api.Test;
@@ -39,24 +40,18 @@ public class RemoteLogManagerConfigTest {
         Map<String, Object> props = getRLMProps(rsmPrefix, rlmmPrefix);
         rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v));
         rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v));
+        RLMTestConfig config = new RLMTestConfig(props);
 
-        RemoteLogManagerConfig expectedRemoteLogManagerConfig = new 
RemoteLogManagerConfig(props);
-
-        // Removing remote.log.metadata.manager.class.name so that the default 
value gets picked up.
-        
props.remove(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
-
-        RemoteLogManagerConfig remoteLogManagerConfig = new 
RemoteLogManagerConfig(props);
-        assertEquals(expectedRemoteLogManagerConfig.values(), 
remoteLogManagerConfig.values());
-
-        assertEquals(rsmProps, 
remoteLogManagerConfig.remoteStorageManagerProps());
-        assertEquals(rlmmProps, 
remoteLogManagerConfig.remoteLogMetadataManagerProps());
+        RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig();
+        assertEquals(rsmProps, rlmConfig.remoteStorageManagerProps());
+        assertEquals(rlmmProps, rlmConfig.remoteLogMetadataManagerProps());
     }
 
     @Test
     public void testDefaultConfigs() {
         // Even with empty properties, RemoteLogManagerConfig has default 
values
         Map<String, Object> emptyProps = new HashMap<>();
-        RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new 
RemoteLogManagerConfig(emptyProps);
+        RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new 
RLMTestConfig(emptyProps).remoteLogManagerConfig();
         
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
         
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
     }
@@ -66,7 +61,7 @@ public class RemoteLogManagerConfigTest {
         // Test with a empty string props should throw ConfigException
         Map<String, Object> emptyStringProps = 
Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
 "");
         assertThrows(ConfigException.class, () ->
-                new RemoteLogManagerConfig(emptyStringProps));
+                new RLMTestConfig(emptyStringProps).remoteLogManagerConfig());
     }
 
     private Map<String, Object> getRLMProps(String rsmPrefix, String 
rlmmPrefix) {
@@ -111,4 +106,18 @@ public class RemoteLogManagerConfigTest {
                 rlmmPrefix);
         return props;
     }
+
+    private static class RLMTestConfig extends AbstractConfig {
+
+        private final RemoteLogManagerConfig rlmConfig;
+
+        public RLMTestConfig(Map<?, ?> originals) {
+            super(RemoteLogManagerConfig.configDef(), originals, true);
+            rlmConfig = new RemoteLogManagerConfig(this);
+        }
+
+        public RemoteLogManagerConfig remoteLogManagerConfig() {
+            return rlmConfig;
+        }
+    }
 }
\ No newline at end of file


Reply via email to