This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 08a52c59c1f MINOR: Refactor GroupCoordinatorConfig (#19092)
08a52c59c1f is described below
commit 08a52c59c1fbc78b05117dd4725e3bf3a0a9c765
Author: David Jacot <[email protected]>
AuthorDate: Wed Mar 5 10:03:59 2025 +0100
MINOR: Refactor GroupCoordinatorConfig (#19092)
We defined multiple `ConfigDef`s in `GroupCoordinatorConfig` in then we
merge them in a few places because we always use them together. Having
multiple `ConfigDef`s does not seem necessary to me. This patch changes
it to have just one.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../coordinator/group/GroupCoordinatorConfig.java | 27 +++++++++-------------
.../group/GroupCoordinatorConfigTest.java | 16 ++++---------
.../group/GroupMetadataManagerTestContext.java | 9 +-------
.../group/modern/share/ShareGroupConfigTest.java | 2 +-
.../kafka/server/config/AbstractKafkaConfig.java | 6 +----
5 files changed, 19 insertions(+), 41 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index fb7c5444230..7ebe63f9c5a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -238,7 +238,8 @@ public class GroupCoordinatorConfig {
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT =
15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC =
"The maximum heartbeat interval for share group members.";
- public static final ConfigDef GROUP_COORDINATOR_CONFIG_DEF = new
ConfigDef()
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ // Group coordinator configs
.define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST,
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
ConfigDef.ValidList.in(Group.GroupType.documentValidValues()),
MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
.define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT,
GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH,
GROUP_COORDINATOR_NUM_THREADS_DOC)
@@ -250,20 +251,20 @@ public class GroupCoordinatorConfig {
.define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH,
OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
.define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int)
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH,
OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
// Internal configuration used by integration and system tests.
- .defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN,
NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM,
NEW_GROUP_COORDINATOR_ENABLE_DOC);
+ .defineInternal(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN,
NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM,
NEW_GROUP_COORDINATOR_ENABLE_DOC)
- public static final ConfigDef OFFSET_MANAGEMENT_CONFIG_DEF = new
ConfigDef()
+ // Offset configs
.define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT,
OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
.define(OFFSETS_RETENTION_MINUTES_CONFIG, INT,
OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH,
OFFSETS_RETENTION_MINUTES_DOC)
- .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG,
OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH,
OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC);
+ .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG,
OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH,
OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
- public static final ConfigDef CLASSIC_GROUP_CONFIG_DEF = new ConfigDef()
+ // Classic group configs
.define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
.define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT,
GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM,
GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
- .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT,
atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC);
+ .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT,
atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC)
- public static final ConfigDef CONSUMER_GROUP_CONFIG_DEF = new ConfigDef()
+ // Consumer group configs
.define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
@@ -272,9 +273,9 @@ public class GroupCoordinatorConfig {
.define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT,
CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM,
CONSUMER_GROUP_MAX_SIZE_DOC)
.define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST,
CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
- .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING,
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC);
+ .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING,
CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)),
MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
- public static final ConfigDef SHARE_GROUP_CONFIG_DEF = new ConfigDef()
+ // Share group configs
.define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
.define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT,
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
@@ -406,13 +407,7 @@ public class GroupCoordinatorConfig {
) {
return new GroupCoordinatorConfig(
new AbstractConfig(
- Utils.mergeConfigs(List.of(
- GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
- GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
- GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
- GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
- GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
- )),
+ GroupCoordinatorConfig.CONFIG_DEF,
props
)
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 8b92b258bbb..9979ea48964 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -19,10 +19,8 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.Utils;
import
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
@@ -44,13 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupCoordinatorConfigTest {
- private static final List<ConfigDef> GROUP_COORDINATOR_CONFIG_DEFS =
List.of(
- GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
- GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
- GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
- GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
- GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
- );
public static class CustomAssignor implements
ConsumerGroupPartitionAssignor, Configurable {
public Map<String, ?> configs;
@@ -318,7 +309,10 @@ public class GroupCoordinatorConfigTest {
}
public static GroupCoordinatorConfig createConfig(Map<String, Object>
configs) {
- return new GroupCoordinatorConfig(
- new
AbstractConfig(Utils.mergeConfigs(GROUP_COORDINATOR_CONFIG_DEFS), configs,
false));
+ return new GroupCoordinatorConfig(new AbstractConfig(
+ GroupCoordinatorConfig.CONFIG_DEF,
+ configs,
+ false
+ ));
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 033069855da..b8ac2bfb9c2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -52,7 +52,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
@@ -169,13 +168,7 @@ public class GroupMetadataManagerTestContext {
) {
return new GroupCoordinatorConfigContext(
new AbstractConfig(
- Utils.mergeConfigs(List.of(
- GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
- GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
- GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
- GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
- GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF
- )),
+ GroupCoordinatorConfig.CONFIG_DEF,
props
)
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 94aab9eded2..56d2a58bd63 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -145,6 +145,6 @@ public class ShareGroupConfigTest {
private static ShareGroupConfig createConfig(Map<String, Object> configs) {
return new ShareGroupConfig(
- new
AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF,
GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF)), configs, false));
+ new
AbstractConfig(Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF,
GroupCoordinatorConfig.CONFIG_DEF)), configs, false));
}
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index df38866d4f5..890a62248d5 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -49,11 +49,7 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
KRaftConfigs.CONFIG_DEF,
SocketServerConfigs.CONFIG_DEF,
ReplicationConfigs.CONFIG_DEF,
- GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF,
- GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
- GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
- GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF,
- GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF,
+ GroupCoordinatorConfig.CONFIG_DEF,
CleanerConfig.CONFIG_DEF,
LogConfig.SERVER_CONFIG_DEF,
ShareGroupConfig.CONFIG_DEF,