This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 7842e25d321 KAFKA-17031: Make RLM thread pool configurations public 
and fix default handling (#17499)
7842e25d321 is described below

commit 7842e25d32131e0e1362592fde86a46278e9dcff
Author: Federico Valeri <fedeval...@gmail.com>
AuthorDate: Mon Oct 21 19:39:11 2024 +0200

    KAFKA-17031: Make RLM thread pool configurations public and fix default 
handling (#17499)
    
    According to KIP-950, remote.log.manager.thread.pool.size should be marked 
as deprecated and replaced by two new configurations: 
remote.log.manager.copier.thread.pool.size and 
remote.log.manager.expiration.thread.pool.size. Fix default handling so that -1 
works as expected.
    
    Reviewers: Luke Chen <show...@gmail.com>, Gaurav Narula 
<gaurav_naru...@apple.com>, Satish Duggana <sati...@apache.org>, Colin P. 
McCabe <cmcc...@apache.org>
---
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  2 ++
 .../log/remote/storage/RemoteLogManagerConfig.java | 40 ++++++++++++++--------
 .../remote/storage/RemoteLogManagerConfigTest.java | 15 +++++---
 3 files changed, 39 insertions(+), 18 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b093112c92c..541080aee8a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1106,6 +1106,8 @@ class KafkaConfigTest {
         case 
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // 
ignore string
         case 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
         case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 9f73af4a7c9..bf0aba8e859 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.server.log.remote.storage;
 
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 
 import java.util.Collections;
 import java.util.Map;
@@ -93,20 +94,29 @@ public final class RemoteLogManagerConfig {
     public static final long 
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;
 
     public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.thread.pool.size";
-    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size 
of the thread pool used in scheduling tasks to copy " +
+    public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = 
"Deprecated. Size of the thread pool used in scheduling tasks to copy " +
             "segments, fetch remote log indexes and clean up remote log 
segments.";
     public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
 
+    private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The 
default value of -1 means that this will be set to the configured value of " +
+        REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, 
it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
+    private static final ConfigDef.Validator 
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with(
+        (name, value) -> { 
+            if ((int) value < -1 || (int) value == 0) throw new 
ConfigException(name, value, "Value can be -1 or greater than 0"); 
+        },
+        () -> REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK
+    );
+
     public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP 
= "remote.log.manager.copier.thread.pool.size";
-    public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC 
= "Size of the thread pool used in " +
-            "scheduling tasks to copy segments.";
-    public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE 
= 10;
+    public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC 
= "Size of the thread pool used in scheduling tasks " +
+            "to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
+    public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE 
= -1;
 
     public static final String 
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.expiration.thread.pool.size";
-    public static final String 
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool 
used in" +
-            " scheduling tasks to clean up remote log segments.";
-    public static final int 
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
-
+    public static final String 
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool 
used in scheduling tasks " +
+            "to clean up remote log segments. " + 
REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
+    public static final int 
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1;
+    
     public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
     public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
             "segments, and clean up remote log segments.";
@@ -253,16 +263,16 @@ public final class RemoteLogManagerConfig {
                         atLeast(1),
                         MEDIUM,
                         REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
-                
.defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+                .define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
                         INT,
                         DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
-                        atLeast(1),
+                        REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
                         MEDIUM,
                         REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
-                
.defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+                .define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
                         INT,
                         DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
-                        atLeast(1),
+                        REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
                         MEDIUM,
                         REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
                 .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
@@ -385,11 +395,13 @@ public final class RemoteLogManagerConfig {
     }
 
     public int remoteLogManagerCopierThreadPoolSize() {
-        return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
+        int size = 
config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
+        return size == -1 ? remoteLogManagerThreadPoolSize() : size;
     }
 
     public int remoteLogManagerExpirationThreadPoolSize() {
-        return 
config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
+        int size = 
config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
+        return size == -1 ? remoteLogManagerThreadPoolSize() : size;
     }
 
     public long remoteLogManagerTaskIntervalMs() {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index cb28f71a456..7ce0c46a515 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -29,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class RemoteLogManagerConfigTest {
-
     @Test
     public void testValidConfigs() {
         String rsmPrefix = "__custom.rsm.";
@@ -56,6 +55,16 @@ public class RemoteLogManagerConfigTest {
         
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
     }
 
+    @Test
+    public void testThreadPoolDefaults() {
+        // Even with empty properties, RemoteLogManagerConfig has default 
values
+        Map<String, Object> emptyProps = new HashMap<>();
+        RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new 
RLMTestConfig(emptyProps).remoteLogManagerConfig();
+        
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
+        
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
+        
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE,
 remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
+    }
+
     @Test
     public void testValidateEmptyStringConfig() {
         // Test with a empty string props should throw ConfigException
@@ -65,7 +74,6 @@ public class RemoteLogManagerConfigTest {
     }
 
     private Map<String, Object> getRLMProps(String rsmPrefix, String 
rlmmPrefix) {
-
         Map<String, Object> props = new HashMap<>();
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
         
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
@@ -108,7 +116,6 @@ public class RemoteLogManagerConfigTest {
     }
 
     private static class RLMTestConfig extends AbstractConfig {
-
         private final RemoteLogManagerConfig rlmConfig;
 
         public RLMTestConfig(Map<?, ?> originals) {
@@ -120,4 +127,4 @@ public class RemoteLogManagerConfigTest {
             return rlmConfig;
         }
     }
-}
\ No newline at end of file
+}

Reply via email to