This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 065315431d8 KAFKA-20210: Add group.coordinator.background.threads 
config (#21555)
065315431d8 is described below

commit 065315431d81dc285e1fa1070216c11715c4865a
Author: Sean Quah <[email protected]>
AuthorDate: Tue Mar 3 06:48:48 2026 +0000

    KAFKA-20210: Add group.coordinator.background.threads config (#21555)
    
    Add the group.coordinator.background.threads config option to control
    the  number of threads in the group coordinator's CoordinatorExecutor
    thread  pool.
    
    Reviewers: Dongnuo Lyu <[email protected]>, Chia-Ping Tsai
     <[email protected]>, PoAn Yang <[email protected]>, David Jacot
     <[email protected]>
---
 .../test/scala/unit/kafka/server/KafkaConfigTest.scala  |  1 +
 .../kafka/coordinator/group/GroupCoordinatorConfig.java | 17 ++++++++++++++++-
 .../coordinator/group/GroupCoordinatorService.java      |  6 +++++-
 .../coordinator/group/GroupCoordinatorConfigTest.java   |  2 ++
 4 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index cf27943ca03..ead46d36018 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1036,6 +1036,7 @@ class KafkaConfigTest {
 
         /** New group coordinator configs */
         case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
         case GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, 512 * 1024 - 
1)
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index bb969501f29..e12d6ab9685 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -80,9 +80,14 @@ public class GroupCoordinatorConfig {
     public static final int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = -1;
 
     public static final String GROUP_COORDINATOR_NUM_THREADS_CONFIG = 
"group.coordinator.threads";
-    public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number 
of threads used by the group coordinator.";
+    public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number 
of threads used by the group coordinator for processing requests.";
     public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 4;
 
+    public static final String GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG 
= "group.coordinator.background.threads";
+    public static final String GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_DOC = 
"The number of threads used by the group coordinator for " +
+        "processing background tasks (e.g. updating regular expression 
subscriptions and offloaded assignments).";
+    public static final int GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_DEFAULT = 
2;
+
     public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = 
"offsets.load.buffer.size";
     public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
     public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for 
reading from the offsets segments when loading group metadata " +
@@ -318,6 +323,7 @@ public class GroupCoordinatorConfig {
         .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, 
             ConfigDef.ValidList.in(false, 
Group.GroupType.documentValidValues()), MEDIUM, 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
         .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, 
GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, 
GROUP_COORDINATOR_NUM_THREADS_DOC)
+        .define(GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG, INT, 
GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_DEFAULT, atLeast(1), HIGH, 
GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_DOC)
         .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, 
GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(-1), MEDIUM, 
GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
         .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, 
OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, 
OFFSET_COMMIT_TIMEOUT_MS_DOC)
         .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, 
OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, 
OFFSETS_LOAD_BUFFER_SIZE_DOC)
@@ -383,6 +389,7 @@ public class GroupCoordinatorConfig {
     public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 
* 1000;
 
     private final int numThreads;
+    private final int numBackgroundThreads;
     private final int appendLingerMs;
     private final int consumerGroupSessionTimeoutMs;
     private final int consumerGroupHeartbeatIntervalMs;
@@ -434,6 +441,7 @@ public class GroupCoordinatorConfig {
     @SuppressWarnings("this-escape")
     public GroupCoordinatorConfig(AbstractConfig config) {
         this.numThreads = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
+        this.numBackgroundThreads = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG);
         this.appendLingerMs = 
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
         this.consumerGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
         this.consumerGroupHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -672,6 +680,13 @@ public class GroupCoordinatorConfig {
         return numThreads;
     }
 
+    /**
+     * The number of background threads.
+     */
+    public int numBackgroundThreads() {
+        return numBackgroundThreads;
+    }
+
     /**
      * The duration in milliseconds that the coordinator will wait for writes 
to
      * accumulate before flushing them to disk. {@code OptionalInt.empty()} 
indicates
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 55e88434399..286b3f941bc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -81,6 +81,7 @@ import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
@@ -276,7 +277,10 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                     .withSerializer(new GroupCoordinatorRecordSerde())
                     
.withCompression(Compression.of(config.offsetTopicCompressionType()).build())
                     .withAppendLingerMs(config.appendLingerMs())
-                    .withExecutorService(Executors.newSingleThreadExecutor())
+                    .withExecutorService(Executors.newFixedThreadPool(
+                        config.numBackgroundThreads(),
+                        
ThreadUtils.createThreadFactory("group-coordinator-background-%d", false)
+                    ))
                     
.withCachedBufferMaxBytesSupplier(config::cachedBufferMaxBytes)
                     .build();
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 80c50195575..0208e627d6a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -175,6 +175,7 @@ public class GroupCoordinatorConfigTest {
     public void testConfigs() {
         Map<String, Object> configs = new HashMap<>();
         
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 10);
+        
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG,
 3);
         
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 
10);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
555);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
200);
@@ -205,6 +206,7 @@ public class GroupCoordinatorConfigTest {
         GroupCoordinatorConfig config = createConfig(configs);
 
         assertEquals(10, config.numThreads());
+        assertEquals(3, config.numBackgroundThreads());
         assertEquals(555, config.consumerGroupSessionTimeoutMs());
         assertEquals(200, config.consumerGroupHeartbeatIntervalMs());
         assertEquals(55, config.consumerGroupMaxSize());

Reply via email to