This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4fe08f3b297 KAFKA-16976 Update the current/dynamic config inside RemoteLogManagerConfig (#16394) 4fe08f3b297 is described below commit 4fe08f3b297064794d8ff0c38d537f2dd32de1e8 Author: Kamal Chandraprakash <kchandraprak...@uber.com> AuthorDate: Thu Jun 20 21:03:35 2024 +0530 KAFKA-16976 Update the current/dynamic config inside RemoteLogManagerConfig (#16394) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../java/kafka/log/remote/RemoteLogManager.java | 35 ++++------ .../src/main/scala/kafka/server/BrokerServer.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +-- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- .../main/scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/log/remote/RemoteLogManagerTest.java | 56 ++++++++-------- .../kafka/server/DynamicBrokerConfigTest.scala | 50 ++++++++------- .../unit/kafka/server/ReplicaManagerTest.scala | 4 +- .../log/remote/storage/RemoteLogManagerConfig.java | 74 ++++++++++++++-------- .../remote/storage/RemoteLogManagerConfigTest.java | 33 ++++++---- 10 files changed, 144 insertions(+), 124 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 2b8f36ba501..e51e3e4347e 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -22,7 +22,6 @@ import kafka.log.UnifiedLog; import kafka.log.remote.quota.RLMQuotaManager; import kafka.log.remote.quota.RLMQuotaManagerConfig; import kafka.server.BrokerTopicStats; -import kafka.server.KafkaConfig; import kafka.server.QuotaType; import kafka.server.StopPartition; @@ -155,7 +154,7 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; - private final KafkaConfig config; + private final RemoteLogManagerConfig rlmConfig; private final int brokerId; private final String logDir; private final Time time; @@ -196,7 +195,7 @@ public class RemoteLogManager implements Closeable { /** * Creates RemoteLogManager instance with the given arguments. * - * @param config Configuration required for remote logging subsystem(tiered storage) at the broker level. + * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level. * @param brokerId id of the current broker. * @param logDir directory of Kafka log segments. * @param time Time instance. @@ -206,7 +205,7 @@ public class RemoteLogManager implements Closeable { * @param brokerTopicStats BrokerTopicStats instance to update the respective metrics. * @param metrics Metrics instance */ - public RemoteLogManager(KafkaConfig config, + public RemoteLogManager(RemoteLogManagerConfig rlmConfig, int brokerId, String logDir, String clusterId, @@ -215,7 +214,7 @@ public class RemoteLogManager implements Closeable { BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset, BrokerTopicStats brokerTopicStats, Metrics metrics) throws IOException { - this.config = config; + this.rlmConfig = rlmConfig; this.brokerId = brokerId; this.logDir = logDir; this.clusterId = clusterId; @@ -230,8 +229,7 @@ public class RemoteLogManager implements Closeable { rlmCopyQuotaManager = createRLMCopyQuotaManager(); rlmFetchQuotaManager = createRLMFetchQuotaManager(); - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); - indexCache = new RemoteIndexCache(config.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); + indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteLogStorageManager, logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); @@ -279,12 +277,12 @@ public class RemoteLogManager implements Closeable { } RLMQuotaManager createRLMCopyQuotaManager() { - return new RLMQuotaManager(copyQuotaManagerConfig(config), metrics, QuotaType.RLMCopy$.MODULE$, + return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", time); } RLMQuotaManager createRLMFetchQuotaManager() { - return new RLMQuotaManager(fetchQuotaManagerConfig(config), metrics, QuotaType.RLMFetch$.MODULE$, + return new RLMQuotaManager(fetchQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMFetch$.MODULE$, "Tracking fetch byte-rate for Remote Log Manager", time); } @@ -292,16 +290,14 @@ public class RemoteLogManager implements Closeable { return rlmFetchQuotaManager.isQuotaExceeded(); } - static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig config) { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); - return new RLMQuotaManagerConfig(config.remoteLogManagerCopyMaxBytesPerSecond(), + static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { + return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerCopyMaxBytesPerSecond(), rlmConfig.remoteLogManagerCopyNumQuotaSamples(), rlmConfig.remoteLogManagerCopyQuotaWindowSizeSeconds()); } - static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig config) { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); - return new RLMQuotaManagerConfig(config.remoteLogManagerFetchMaxBytesPerSecond(), + static RLMQuotaManagerConfig fetchQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) { + return new RLMQuotaManagerConfig(rlmConfig.remoteLogManagerFetchMaxBytesPerSecond(), rlmConfig.remoteLogManagerFetchNumQuotaSamples(), rlmConfig.remoteLogManagerFetchQuotaWindowSizeSeconds()); } @@ -318,7 +314,6 @@ public class RemoteLogManager implements Closeable { @SuppressWarnings("removal") RemoteStorageManager createRemoteStorageManager() { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>() { private final String classPath = rlmConfig.remoteStorageManagerClassPath(); @@ -335,14 +330,13 @@ public class RemoteLogManager implements Closeable { } private void configureRSM() { - final Map<String, Object> rsmProps = new HashMap<>(config.remoteLogManagerConfig().remoteStorageManagerProps()); + final Map<String, Object> rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps()); rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); remoteLogStorageManager.configure(rsmProps); } @SuppressWarnings("removal") RemoteLogMetadataManager createRemoteLogMetadataManager() { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>() { private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath(); @@ -369,7 +363,7 @@ public class RemoteLogManager implements Closeable { rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", e.securityProtocol().name); }); // update the remoteLogMetadataProps here to override endpoint config if any - rlmmProps.putAll(config.remoteLogManagerConfig().remoteLogMetadataManagerProps()); + rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); rlmmProps.put(LOG_DIR_CONFIG, logDir); @@ -421,7 +415,7 @@ public class RemoteLogManager implements Closeable { Map<String, Uuid> topicIds) { LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); - if (config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { + if (rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } @@ -1751,7 +1745,6 @@ public class RemoteLogManager implements Closeable { void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition, Consumer<RLMTask> convertToLeaderOrFollower) { - RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition, topicIdPartition -> { RLMTask task = new RLMTask(topicIdPartition, rlmConfig.remoteLogMetadataCustomMetadataMaxBytes()); diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 723e6b01b61..29f3a5cc0b0 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -621,7 +621,7 @@ class BrokerServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time, + Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a7513c4ec87..7e84514c1e7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -231,7 +231,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG) val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG) - private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props) + private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { @@ -869,14 +869,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) - def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) - - def remoteLogIndexFileCacheTotalSizeBytes: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) - - def remoteLogManagerCopyMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP) - - def remoteLogManagerFetchMaxBytesPerSecond: Long = getLong(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP) - validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index df9d631dca5..23b7e8ebc3d 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -691,7 +691,7 @@ class KafkaServer( protected def createRemoteLogManager(): Option[RemoteLogManager] = { if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time, + Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).asJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6ab67a84ec4..3615788aeab 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,7 +1479,7 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } - val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong + val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 7e3eb2dad6f..4f2405a94c7 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -226,11 +226,11 @@ public class RemoteLogManagerTest { Properties props = kafka.utils.TestUtils.createDummyBrokerConfig(); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); - createRLMConfig(props); + appendRLMConfig(props); config = KafkaConfig.fromProps(props); brokerTopicStats = new BrokerTopicStats(KafkaConfig.fromProps(props).remoteLogManagerConfig().isRemoteStorageSystemEnabled()); - remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -338,11 +338,14 @@ public class RemoteLogManagerTest { String key = "key"; String configPrefix = "config.prefix"; Properties props = new Properties(); + props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, configPrefix); props.put(configPrefix + key, "world"); props.put("remote.log.metadata.y", "z"); + appendRLMConfig(props); + KafkaConfig config = KafkaConfig.fromProps(props); - Map<String, Object> metadataMangerConfig = createRLMConfig(props).remoteLogMetadataManagerProps(); + Map<String, Object> metadataMangerConfig = config.remoteLogManagerConfig().remoteLogMetadataManagerProps(); assertEquals(props.get(configPrefix + key), metadataMangerConfig.get(key)); assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y")); } @@ -352,11 +355,14 @@ public class RemoteLogManagerTest { String key = "key"; String configPrefix = "config.prefix"; Properties props = new Properties(); + props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, configPrefix); props.put(configPrefix + key, "world"); props.put("remote.storage.manager.y", "z"); + appendRLMConfig(props); + KafkaConfig config = KafkaConfig.fromProps(props); - Map<String, Object> remoteStorageManagerConfig = createRLMConfig(props).remoteStorageManagerProps(); + Map<String, Object> remoteStorageManagerConfig = config.remoteLogManagerConfig().remoteStorageManagerProps(); assertEquals(props.get(configPrefix + key), remoteStorageManagerConfig.get(key)); assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y")); } @@ -385,10 +391,10 @@ public class RemoteLogManagerTest { props.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); // override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix" props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL"); - createRLMConfig(props); + appendRLMConfig(props); KafkaConfig config = KafkaConfig.fromProps(props); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config, + config.remoteLogManagerConfig(), brokerId, logDir, clusterId, @@ -1289,7 +1295,7 @@ public class RemoteLogManagerTest { void testGetClassLoaderAwareRemoteStorageManager() throws Exception { ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); try (RemoteLogManager remoteLogManager = - new RemoteLogManager(config, brokerId, logDir, clusterId, time, + new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, t -> Optional.empty(), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1551,7 +1557,7 @@ public class RemoteLogManagerTest { public void testRemoveMetricsOnClose() throws IOException { MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class); try { - RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, + RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; @@ -1946,7 +1952,7 @@ public class RemoteLogManagerTest { else return Collections.emptyIterator(); }); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1971,7 +1977,7 @@ public class RemoteLogManagerTest { when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) .thenReturn(Collections.emptyIterator()); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -2005,7 +2011,7 @@ public class RemoteLogManagerTest { }); AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2437,7 +2443,7 @@ public class RemoteLogManagerTest { @Test public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException { AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2579,7 +2585,7 @@ public class RemoteLogManagerTest { ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config, + config.remoteLogManagerConfig(), brokerId, logDir, clusterId, @@ -2652,7 +2658,7 @@ public class RemoteLogManagerTest { ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config, + config.remoteLogManagerConfig(), brokerId, logDir, clusterId, @@ -2737,7 +2743,7 @@ public class RemoteLogManagerTest { try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config, + config.remoteLogManagerConfig(), brokerId, logDir, clusterId, @@ -2783,9 +2789,9 @@ public class RemoteLogManagerTest { public void testCopyQuotaManagerConfig() { Properties defaultProps = new Properties(); defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); - createRLMConfig(defaultProps); + appendRLMConfig(defaultProps); KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); - RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); + RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); @@ -2795,10 +2801,10 @@ public class RemoteLogManagerTest { customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - createRLMConfig(customProps); + appendRLMConfig(customProps); KafkaConfig config = KafkaConfig.fromProps(customProps); - RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config); + RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config.remoteLogManagerConfig()); assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -2808,10 +2814,10 @@ public class RemoteLogManagerTest { public void testFetchQuotaManagerConfig() { Properties defaultProps = new Properties(); defaultProps.put("zookeeper.connect", kafka.utils.TestUtils.MockZkConnect()); - createRLMConfig(defaultProps); + appendRLMConfig(defaultProps); KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); - RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); + RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); @@ -2821,9 +2827,9 @@ public class RemoteLogManagerTest { customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); - createRLMConfig(customProps); + appendRLMConfig(customProps); KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps); - RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); + RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig.remoteLogManagerConfig()); assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -3075,7 +3081,7 @@ public class RemoteLogManagerTest { return partition; } - private RemoteLogManagerConfig createRLMConfig(Properties props) { + private void appendRLMConfig(Properties props) { props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, NoOpRemoteStorageManager.class.getName()); props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, NoOpRemoteLogMetadataManager.class.getName()); @@ -3086,8 +3092,6 @@ public class RemoteLogManagerTest { props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal); props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal); props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal); - - return new RemoteLogManagerConfig(props); } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 8dad66919bc..a9d6eea9373 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -798,7 +798,8 @@ class DynamicBrokerConfigTest { val config = KafkaConfig(props) val kafkaBroker = mock(classOf[KafkaBroker]) when(kafkaBroker.config).thenReturn(config) - assertEquals(500, config.remoteFetchMaxWaitMs) + when(kafkaBroker.remoteLogManagerOpt).thenReturn(None) + assertEquals(500, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) config.dynamicConfig.initialize(None, None) @@ -809,13 +810,13 @@ class DynamicBrokerConfigTest { // update default config config.dynamicConfig.validate(newProps, perBrokerConfig = false) config.dynamicConfig.updateDefaultConfig(newProps) - assertEquals(30000, config.remoteFetchMaxWaitMs) + assertEquals(30000, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) // update per broker config newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "10000") config.dynamicConfig.validate(newProps, perBrokerConfig = true) config.dynamicConfig.updateBrokerConfig(0, newProps) - assertEquals(10000, config.remoteFetchMaxWaitMs) + assertEquals(10000, config.remoteLogManagerConfig.remoteFetchMaxWaitMs) // invalid values for (maxWaitMs <- Seq(-1, 0)) { @@ -832,10 +833,10 @@ class DynamicBrokerConfigTest { val config = KafkaConfig(origProps) val serverMock = Mockito.mock(classOf[KafkaBroker]) - val remoteLogManagerMockOpt = Option(Mockito.mock(classOf[RemoteLogManager])) + val remoteLogManager = Mockito.mock(classOf[RemoteLogManager]) Mockito.when(serverMock.config).thenReturn(config) - Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(remoteLogManagerMockOpt) + Mockito.when(serverMock.remoteLogManagerOpt).thenReturn(Some(remoteLogManager)) config.dynamicConfig.initialize(None, None) config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) @@ -844,10 +845,10 @@ class DynamicBrokerConfigTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, "4") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(4L, config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) - Mockito.verify(remoteLogManagerMockOpt.get).resizeCacheSize(4) + assertEquals(4L, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()) + Mockito.verify(remoteLogManager).resizeCacheSize(4) - Mockito.verifyNoMoreInteractions(remoteLogManagerMockOpt.get) + Mockito.verifyNoMoreInteractions(remoteLogManager) } @Test @@ -864,18 +865,18 @@ class DynamicBrokerConfigTest { config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, - config.remoteLogManagerCopyMaxBytesPerSecond) + config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) // Update default config props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "100") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) verify(remoteLogManager).updateCopyQuota(100) // Update per broker config props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, "200") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond) + assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) verify(remoteLogManager).updateCopyQuota(200) verifyNoMoreInteractions(remoteLogManager) @@ -895,18 +896,18 @@ class DynamicBrokerConfigTest { config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, - config.remoteLogManagerFetchMaxBytesPerSecond) + config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) // Update default config props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "100") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(100, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) verify(remoteLogManager).updateFetchQuota(100) // Update per broker config props.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, "200") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) verify(remoteLogManager).updateFetchQuota(200) verifyNoMoreInteractions(remoteLogManager) @@ -930,18 +931,21 @@ class DynamicBrokerConfigTest { config.dynamicConfig.addBrokerReconfigurable(new DynamicRemoteLogConfig(serverMock)) // Default values - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, config.remoteLogIndexFileCacheTotalSizeBytes) - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, config.remoteLogManagerCopyMaxBytesPerSecond) - assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES, + config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, + config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, + config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) // Update default config props.put(indexFileCacheSizeProp, "4") props.put(copyQuotaProp, "100") props.put(fetchQuotaProp, "200") config.dynamicConfig.updateDefaultConfig(props) - assertEquals(4, config.remoteLogIndexFileCacheTotalSizeBytes) - assertEquals(100, config.remoteLogManagerCopyMaxBytesPerSecond) - assertEquals(200, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(4, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()) + assertEquals(100, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) + assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) verify(remoteLogManager).resizeCacheSize(4) verify(remoteLogManager).updateCopyQuota(100) verify(remoteLogManager).updateFetchQuota(200) @@ -951,9 +955,9 @@ class DynamicBrokerConfigTest { props.put(copyQuotaProp, "200") props.put(fetchQuotaProp, "400") config.dynamicConfig.updateBrokerConfig(0, props) - assertEquals(8, config.remoteLogIndexFileCacheTotalSizeBytes) - assertEquals(200, config.remoteLogManagerCopyMaxBytesPerSecond) - assertEquals(400, config.remoteLogManagerFetchMaxBytesPerSecond) + assertEquals(8, config.remoteLogManagerConfig.remoteLogIndexFileCacheTotalSizeBytes()) + assertEquals(200, config.remoteLogManagerConfig.remoteLogManagerCopyMaxBytesPerSecond()) + assertEquals(400, config.remoteLogManagerConfig.remoteLogManagerFetchMaxBytesPerSecond()) verify(remoteLogManager).resizeCacheSize(8) verify(remoteLogManager).updateCopyQuota(200) verify(remoteLogManager).updateFetchQuota(400) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 53b342e10db..a1f3ce002c2 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -4097,7 +4097,7 @@ class ReplicaManagerTest { val mockLog = mock(classOf[UnifiedLog]) val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - config, + config.remoteLogManagerConfig, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", @@ -4203,7 +4203,7 @@ class ReplicaManagerTest { val dummyLog = mock(classOf[UnifiedLog]) val brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) val remoteLogManager = new RemoteLogManager( - config, + config.remoteLogManagerConfig, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 480983edec4..fabb4f7c787 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -32,7 +32,7 @@ import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.LONG; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; -public final class RemoteLogManagerConfig extends AbstractConfig { +public final class RemoteLogManagerConfig { /** * Prefix used for properties to be passed to {@link RemoteStorageManager} implementation. Remote log subsystem collects all the properties having @@ -186,6 +186,8 @@ public final class RemoteLogManagerConfig extends AbstractConfig { public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request"; public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500; + private final AbstractConfig config; + public static ConfigDef configDef() { return new ConfigDef() .define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, @@ -353,81 +355,81 @@ public final class RemoteLogManagerConfig extends AbstractConfig { MEDIUM, REMOTE_FETCH_MAX_WAIT_MS_DOC); } - - public RemoteLogManagerConfig(Map<?, ?> props) { - super(configDef(), props); + + public RemoteLogManagerConfig(AbstractConfig config) { + this.config = config; } public boolean isRemoteStorageSystemEnabled() { - return getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); + return config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP); } public String remoteStorageManagerClassName() { - return getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); + return config.getString(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); } public String remoteStorageManagerClassPath() { - return getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); + return config.getString(REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP); } public String remoteLogMetadataManagerClassName() { - return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); + return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); } public String remoteLogMetadataManagerClassPath() { - return getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); + return config.getString(REMOTE_LOG_METADATA_MANAGER_CLASS_PATH_PROP); } public int remoteLogManagerThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); + return config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerCopierThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); + return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); } public int remoteLogManagerExpirationThreadPoolSize() { - return getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); + return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); } public long remoteLogManagerTaskIntervalMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); + return config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP); } public long remoteLogManagerTaskRetryBackoffMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); + return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP); } public long remoteLogManagerTaskRetryBackoffMaxMs() { - return getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); + return config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP); } public double remoteLogManagerTaskRetryJitter() { - return getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); + return config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP); } public int remoteLogReaderThreads() { - return getInt(REMOTE_LOG_READER_THREADS_PROP); + return config.getInt(REMOTE_LOG_READER_THREADS_PROP); } public int remoteLogReaderMaxPendingTasks() { - return getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP); + return config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP); } public String remoteLogMetadataManagerListenerName() { - return getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP); + return config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP); } public int remoteLogMetadataCustomMetadataMaxBytes() { - return getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); + return config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP); } public String remoteStorageManagerPrefix() { - return getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); + return config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP); } public String remoteLogMetadataManagerPrefix() { - return getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); + return config.getString(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP); } public Map<String, Object> remoteStorageManagerProps() { @@ -439,24 +441,40 @@ public final class RemoteLogManagerConfig extends AbstractConfig { } public Map<String, Object> getConfigProps(String configPrefixProp) { - String prefixProp = getString(configPrefixProp); - return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(originalsWithPrefix(prefixProp)); + String prefixProp = config.getString(configPrefixProp); + return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(config.originalsWithPrefix(prefixProp)); } public int remoteLogManagerCopyNumQuotaSamples() { - return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); + return config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP); } public int remoteLogManagerCopyQuotaWindowSizeSeconds() { - return getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); + return config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP); } public int remoteLogManagerFetchNumQuotaSamples() { - return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); + return config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP); } public int remoteLogManagerFetchQuotaWindowSizeSeconds() { - return getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); + return config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP); + } + + public long remoteLogIndexFileCacheTotalSizeBytes() { + return config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP); + } + + public long remoteLogManagerCopyMaxBytesPerSecond() { + return config.getLong(REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP); + } + + public long remoteLogManagerFetchMaxBytesPerSecond() { + return config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP); + } + + public int remoteFetchMaxWaitMs() { + return config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP); } public static void main(String[] args) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 6904c8f3d4b..cb28f71a456 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.server.log.remote.storage; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; @@ -39,24 +40,18 @@ public class RemoteLogManagerConfigTest { Map<String, Object> props = getRLMProps(rsmPrefix, rlmmPrefix); rsmProps.forEach((k, v) -> props.put(rsmPrefix + k, v)); rlmmProps.forEach((k, v) -> props.put(rlmmPrefix + k, v)); + RLMTestConfig config = new RLMTestConfig(props); - RemoteLogManagerConfig expectedRemoteLogManagerConfig = new RemoteLogManagerConfig(props); - - // Removing remote.log.metadata.manager.class.name so that the default value gets picked up. - props.remove(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); - - RemoteLogManagerConfig remoteLogManagerConfig = new RemoteLogManagerConfig(props); - assertEquals(expectedRemoteLogManagerConfig.values(), remoteLogManagerConfig.values()); - - assertEquals(rsmProps, remoteLogManagerConfig.remoteStorageManagerProps()); - assertEquals(rlmmProps, remoteLogManagerConfig.remoteLogMetadataManagerProps()); + RemoteLogManagerConfig rlmConfig = config.remoteLogManagerConfig(); + assertEquals(rsmProps, rlmConfig.remoteStorageManagerProps()); + assertEquals(rlmmProps, rlmConfig.remoteLogMetadataManagerProps()); } @Test public void testDefaultConfigs() { // Even with empty properties, RemoteLogManagerConfig has default values Map<String, Object> emptyProps = new HashMap<>(); - RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RemoteLogManagerConfig(emptyProps); + RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig(); assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize()); assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples()); } @@ -66,7 +61,7 @@ public class RemoteLogManagerConfigTest { // Test with a empty string props should throw ConfigException Map<String, Object> emptyStringProps = Collections.singletonMap(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, ""); assertThrows(ConfigException.class, () -> - new RemoteLogManagerConfig(emptyStringProps)); + new RLMTestConfig(emptyStringProps).remoteLogManagerConfig()); } private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) { @@ -111,4 +106,18 @@ public class RemoteLogManagerConfigTest { rlmmPrefix); return props; } + + private static class RLMTestConfig extends AbstractConfig { + + private final RemoteLogManagerConfig rlmConfig; + + public RLMTestConfig(Map<?, ?> originals) { + super(RemoteLogManagerConfig.configDef(), originals, true); + rlmConfig = new RemoteLogManagerConfig(this); + } + + public RemoteLogManagerConfig remoteLogManagerConfig() { + return rlmConfig; + } + } } \ No newline at end of file