This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push: new 7842e25d321 KAFKA-17031: Make RLM thread pool configurations public and fix default handling (#17499) 7842e25d321 is described below commit 7842e25d32131e0e1362592fde86a46278e9dcff Author: Federico Valeri <fedeval...@gmail.com> AuthorDate: Mon Oct 21 19:39:11 2024 +0200 KAFKA-17031: Make RLM thread pool configurations public and fix default handling (#17499) According to KIP-950, remote.log.manager.thread.pool.size should be marked as deprecated and replaced by two new configurations: remote.log.manager.copier.thread.pool.size and remote.log.manager.expiration.thread.pool.size. Fix default handling so that -1 works as expected. Reviewers: Luke Chen <show...@gmail.com>, Gaurav Narula <gaurav_naru...@apple.com>, Satish Duggana <sati...@apache.org>, Colin P. McCabe <cmcc...@apache.org> --- .../scala/unit/kafka/server/KafkaConfigTest.scala | 2 ++ .../log/remote/storage/RemoteLogManagerConfig.java | 40 ++++++++++++++-------- .../remote/storage/RemoteLogManagerConfigTest.java | 15 +++++--- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index b093112c92c..541080aee8a 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1106,6 +1106,8 @@ class KafkaConfigTest { case RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // ignore string case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2) + case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 9f73af4a7c9..bf0aba8e859 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -18,6 +18,7 @@ package org.apache.kafka.server.log.remote.storage; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import java.util.Collections; import java.util.Map; @@ -93,20 +94,29 @@ public final class RemoteLogManagerConfig { public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L; public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size"; - public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks to copy " + + public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Deprecated. Size of the thread pool used in scheduling tasks to copy " + "segments, fetch remote log indexes and clean up remote log segments."; public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10; + private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The default value of -1 means that this will be set to the configured value of " + + REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + "."; + private static final ConfigDef.Validator REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> { + if ((int) value < -1 || (int) value == 0) throw new ConfigException(name, value, "Value can be -1 or greater than 0"); + }, + () -> REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK + ); + public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size"; - public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in " + - "scheduling tasks to copy segments."; - public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10; + public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " + + "to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK; + public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = -1; public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size"; - public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in" + - " scheduling tasks to clean up remote log segments."; - public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10; - + public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " + + "to clean up remote log segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK; + public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1; + public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms"; public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " + "segments, and clean up remote log segments."; @@ -253,16 +263,16 @@ public final class RemoteLogManagerConfig { atLeast(1), MEDIUM, REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC) - .defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, + .define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, INT, DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, - atLeast(1), + REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR, MEDIUM, REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC) - .defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, + .define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, INT, DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, - atLeast(1), + REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR, MEDIUM, REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC) .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, @@ -385,11 +395,13 @@ public final class RemoteLogManagerConfig { } public int remoteLogManagerCopierThreadPoolSize() { - return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); + int size = config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); + return size == -1 ? remoteLogManagerThreadPoolSize() : size; } public int remoteLogManagerExpirationThreadPoolSize() { - return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); + int size = config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); + return size == -1 ? remoteLogManagerThreadPoolSize() : size; } public long remoteLogManagerTaskIntervalMs() { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index cb28f71a456..7ce0c46a515 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -29,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class RemoteLogManagerConfigTest { - @Test public void testValidConfigs() { String rsmPrefix = "__custom.rsm."; @@ -56,6 +55,16 @@ public class RemoteLogManagerConfigTest { assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples()); } + @Test + public void testThreadPoolDefaults() { + // Even with empty properties, RemoteLogManagerConfig has default values + Map<String, Object> emptyProps = new HashMap<>(); + RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig(); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize()); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize()); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize()); + } + @Test public void testValidateEmptyStringConfig() { // Test with a empty string props should throw ConfigException @@ -65,7 +74,6 @@ public class RemoteLogManagerConfigTest { } private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) { - Map<String, Object> props = new HashMap<>(); props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, @@ -108,7 +116,6 @@ public class RemoteLogManagerConfigTest { } private static class RLMTestConfig extends AbstractConfig { - private final RemoteLogManagerConfig rlmConfig; public RLMTestConfig(Map<?, ?> originals) { @@ -120,4 +127,4 @@ public class RemoteLogManagerConfigTest { return rlmConfig; } } -} \ No newline at end of file +}