This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4014d66e71d MINOR: Fix GroupConfigManager.validate() to include all
group type defaults (#21593)
4014d66e71d is described below
commit 4014d66e71d4e1eb7d1a341e9f4aa3fd594c530b
Author: majialong <[email protected]>
AuthorDate: Wed Mar 4 02:10:57 2026 +0800
MINOR: Fix GroupConfigManager.validate() to include all group type defaults
(#21593)
`GroupConfigManager.validate()` only included `consumer` group defaults
in the combined configs, causing `share` and `streams` group fields to
fall back to hardcoded defaults. This leads to false validation failures
when broker-configured values differ from those defaults.
Fix by using `extractGroupConfigMap()` which covers all group types, and
adding the missing `extractStreamsGroupConfigMap()` to it.
Reviewers: Andrew Schofield <[email protected]>
---
.../coordinator/group/GroupConfigManager.java | 9 +++---
.../coordinator/group/GroupCoordinatorConfig.java | 14 +++++++-
.../coordinator/group/GroupConfigManagerTest.java | 37 ++++++++++++++++++++++
3 files changed, 55 insertions(+), 5 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 80ef0d24a4a..892dc9272ef 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -76,9 +76,10 @@ public class GroupConfigManager implements AutoCloseable {
/**
* Validate the given properties.
*
- * @param newGroupConfig The new group config.
- * @param groupCoordinatorConfig The group coordinator config.
- * @throws InvalidConfigurationException If validation fails
+ * @param newGroupConfig The new group config.
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
+ * @throws InvalidConfigurationException If validation fails.
*/
public static void validate(
Properties newGroupConfig,
@@ -86,7 +87,7 @@ public class GroupConfigManager implements AutoCloseable {
ShareGroupConfig shareGroupConfig
) {
Properties combinedConfigs = new Properties();
-
combinedConfigs.putAll(groupCoordinatorConfig.extractConsumerGroupConfigMap());
+
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
combinedConfigs.putAll(newGroupConfig);
GroupConfig.validate(combinedConfigs, groupCoordinatorConfig,
shareGroupConfig);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index e12d6ab9685..68d3ecb8fe3 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -655,12 +655,13 @@ public class GroupCoordinatorConfig {
}
/**
- * Copy the subset of properties that are relevant to consumer group and
share group.
+ * Copy the subset of properties that are relevant to consumer group,
share group and streams group.
*/
public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig
shareGroupConfig) {
Map<String, Integer> defaultConfigs = new HashMap<>();
defaultConfigs.putAll(extractConsumerGroupConfigMap());
defaultConfigs.putAll(shareGroupConfig.extractShareGroupConfigMap(this));
+ defaultConfigs.putAll(extractStreamsGroupConfigMap());
return Collections.unmodifiableMap(defaultConfigs);
}
@@ -673,6 +674,17 @@ public class GroupCoordinatorConfig {
GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
consumerGroupHeartbeatIntervalMs());
}
+ /**
+ * Copy the subset of properties that are relevant to streams group.
+ */
+ public Map<String, Integer> extractStreamsGroupConfigMap() {
+ return Map.of(
+ GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
streamsGroupSessionTimeoutMs(),
+ GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
streamsGroupHeartbeatIntervalMs(),
+ GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
streamsGroupNumStandbyReplicas(),
+ GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
streamsGroupInitialRebalanceDelayMs());
+ }
+
/**
* The number of threads or event loops running.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index 675f8f653b3..d50ac1301d0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -17,13 +17,16 @@
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -32,6 +35,7 @@ import java.util.Properties;
import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -81,6 +85,22 @@ public class GroupConfigManagerTest {
assertEquals(6000,
config.getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
}
+ @Test
+ public void testValidateUsesAllGroupTypeDefaults() {
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
46000);
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
46000);
+
+ GroupCoordinatorConfig groupCoordinatorConfig =
createGroupCoordinatorConfig(configs);
+ ShareGroupConfig shareGroupConfig = createShareGroupConfig();
+
+ Properties newGroupConfig = new Properties();
+ newGroupConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
"2");
+
+ assertDoesNotThrow(() ->
+ GroupConfigManager.validate(newGroupConfig,
groupCoordinatorConfig, shareGroupConfig));
+ }
+
public static GroupConfigManager createConfigManager() {
Map<String, String> defaultConfig = new HashMap<>();
defaultConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT));
@@ -88,4 +108,21 @@ public class GroupConfigManagerTest {
defaultConfig.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
String.valueOf(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT));
return new GroupConfigManager(defaultConfig);
}
+
+ private static GroupCoordinatorConfig
createGroupCoordinatorConfig(Map<String, Object> overrides) {
+ Map<String, Object> configs = new HashMap<>(overrides);
+ return new GroupCoordinatorConfig(new AbstractConfig(
+ GroupCoordinatorConfig.CONFIG_DEF,
+ configs,
+ false
+ ));
+ }
+
+ private static ShareGroupConfig createShareGroupConfig() {
+ return new ShareGroupConfig(new AbstractConfig(
+ Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF,
GroupCoordinatorConfig.CONFIG_DEF)),
+ new HashMap<>(),
+ false
+ ));
+ }
}