This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 065315431d8 KAFKA-20210: Add group.coordinator.background.threads
config (#21555)
065315431d8 is described below
commit 065315431d81dc285e1fa1070216c11715c4865a
Author: Sean Quah <[email protected]>
AuthorDate: Tue Mar 3 06:48:48 2026 +0000
KAFKA-20210: Add group.coordinator.background.threads config (#21555)
Add the group.coordinator.background.threads config option to control
the number of threads in the group coordinator's CoordinatorExecutor
thread pool.
Reviewers: Dongnuo Lyu <[email protected]>, Chia-Ping Tsai
<[email protected]>, PoAn Yang <[email protected]>, David Jacot
<[email protected]>
---
.../test/scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
.../kafka/coordinator/group/GroupCoordinatorConfig.java | 17 ++++++++++++++++-
.../coordinator/group/GroupCoordinatorService.java | 6 +++++-
.../coordinator/group/GroupCoordinatorConfigTest.java | 2 ++
4 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index cf27943ca03..ead46d36018 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1036,6 +1036,7 @@ class KafkaConfigTest {
/** New group coordinator configs */
case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+ case
GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
case GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1, 512 * 1024 -
1)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index bb969501f29..e12d6ab9685 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -80,9 +80,14 @@ public class GroupCoordinatorConfig {
public static final int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = -1;
public static final String GROUP_COORDINATOR_NUM_THREADS_CONFIG =
"group.coordinator.threads";
- public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number
of threads used by the group coordinator.";
+ public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number
of threads used by the group coordinator for processing requests.";
public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 4;
+ public static final String GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG
= "group.coordinator.background.threads";
+ public static final String GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_DOC =
"The number of threads used by the group coordinator for " +
+ "processing background tasks (e.g. updating regular expression
subscriptions and offloaded assignments).";
+ public static final int GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_DEFAULT =
2;
+
public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG =
"offsets.load.buffer.size";
public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for
reading from the offsets segments when loading group metadata " +
@@ -318,6 +323,7 @@ public class GroupCoordinatorConfig {
.define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST,
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
ConfigDef.ValidList.in(false,
Group.GroupType.documentValidValues()), MEDIUM,
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
.define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT,
GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH,
GROUP_COORDINATOR_NUM_THREADS_DOC)
+ .define(GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG, INT,
GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_DEFAULT, atLeast(1), HIGH,
GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_DOC)
.define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT,
GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(-1), MEDIUM,
GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT,
OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH,
OFFSET_COMMIT_TIMEOUT_MS_DOC)
.define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT,
OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH,
OFFSETS_LOAD_BUFFER_SIZE_DOC)
@@ -383,6 +389,7 @@ public class GroupCoordinatorConfig {
public static final int CLASSIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60
* 1000;
private final int numThreads;
+ private final int numBackgroundThreads;
private final int appendLingerMs;
private final int consumerGroupSessionTimeoutMs;
private final int consumerGroupHeartbeatIntervalMs;
@@ -434,6 +441,7 @@ public class GroupCoordinatorConfig {
@SuppressWarnings("this-escape")
public GroupCoordinatorConfig(AbstractConfig config) {
this.numThreads =
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG);
+ this.numBackgroundThreads =
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG);
this.appendLingerMs =
config.getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG);
this.consumerGroupSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.consumerGroupHeartbeatIntervalMs =
config.getInt(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -672,6 +680,13 @@ public class GroupCoordinatorConfig {
return numThreads;
}
+ /**
+ * The number of background threads.
+ */
+ public int numBackgroundThreads() {
+ return numBackgroundThreads;
+ }
+
/**
* The duration in milliseconds that the coordinator will wait for writes
to
* accumulate before flushing them to disk. {@code OptionalInt.empty()}
indicates
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 55e88434399..286b3f941bc 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -81,6 +81,7 @@ import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
@@ -276,7 +277,10 @@ public class GroupCoordinatorService implements
GroupCoordinator {
.withSerializer(new GroupCoordinatorRecordSerde())
.withCompression(Compression.of(config.offsetTopicCompressionType()).build())
.withAppendLingerMs(config.appendLingerMs())
- .withExecutorService(Executors.newSingleThreadExecutor())
+ .withExecutorService(Executors.newFixedThreadPool(
+ config.numBackgroundThreads(),
+
ThreadUtils.createThreadFactory("group-coordinator-background-%d", false)
+ ))
.withCachedBufferMaxBytesSupplier(config::cachedBufferMaxBytes)
.build();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 80c50195575..0208e627d6a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -175,6 +175,7 @@ public class GroupCoordinatorConfigTest {
public void testConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 10);
+
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_BACKGROUND_THREADS_CONFIG,
3);
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG,
10);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG,
555);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
200);
@@ -205,6 +206,7 @@ public class GroupCoordinatorConfigTest {
GroupCoordinatorConfig config = createConfig(configs);
assertEquals(10, config.numThreads());
+ assertEquals(3, config.numBackgroundThreads());
assertEquals(555, config.consumerGroupSessionTimeoutMs());
assertEquals(200, config.consumerGroupHeartbeatIntervalMs());
assertEquals(55, config.consumerGroupMaxSize());