This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4014d66e71d MINOR: Fix GroupConfigManager.validate() to include all 
group type defaults (#21593)
4014d66e71d is described below

commit 4014d66e71d4e1eb7d1a341e9f4aa3fd594c530b
Author: majialong <[email protected]>
AuthorDate: Wed Mar 4 02:10:57 2026 +0800

    MINOR: Fix GroupConfigManager.validate() to include all group type defaults 
(#21593)
    
    `GroupConfigManager.validate()` only included `consumer` group defaults
    in the combined configs, causing `share` and `streams` group fields to
    fall back to hardcoded defaults. This leads to false validation failures
    when broker-configured values differ from those defaults.
    
    Fix by using `extractGroupConfigMap()` which covers all group types, and
    adding the missing `extractStreamsGroupConfigMap()` to it.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../coordinator/group/GroupConfigManager.java      |  9 +++---
 .../coordinator/group/GroupCoordinatorConfig.java  | 14 +++++++-
 .../coordinator/group/GroupConfigManagerTest.java  | 37 ++++++++++++++++++++++
 3 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 80ef0d24a4a..892dc9272ef 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -76,9 +76,10 @@ public class GroupConfigManager implements AutoCloseable {
     /**
      * Validate the given properties.
      *
-     * @param newGroupConfig                 The new group config.
-     * @param groupCoordinatorConfig         The group coordinator config.
-     * @throws InvalidConfigurationException If validation fails
+     * @param newGroupConfig         The new group config.
+     * @param groupCoordinatorConfig The group coordinator config.
+     * @param shareGroupConfig       The share group config.
+     * @throws InvalidConfigurationException If validation fails.
      */
     public static void validate(
         Properties newGroupConfig,
@@ -86,7 +87,7 @@ public class GroupConfigManager implements AutoCloseable {
         ShareGroupConfig shareGroupConfig
     ) {
         Properties combinedConfigs = new Properties();
-        
combinedConfigs.putAll(groupCoordinatorConfig.extractConsumerGroupConfigMap());
+        
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
         combinedConfigs.putAll(newGroupConfig);
         GroupConfig.validate(combinedConfigs, groupCoordinatorConfig, 
shareGroupConfig);
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index e12d6ab9685..68d3ecb8fe3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -655,12 +655,13 @@ public class GroupCoordinatorConfig {
     }
 
     /**
-     * Copy the subset of properties that are relevant to consumer group and 
share group.
+     * Copy the subset of properties that are relevant to consumer group, 
share group and streams group.
      */
     public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig 
shareGroupConfig) {
         Map<String, Integer> defaultConfigs = new HashMap<>();
         defaultConfigs.putAll(extractConsumerGroupConfigMap());
         
defaultConfigs.putAll(shareGroupConfig.extractShareGroupConfigMap(this));
+        defaultConfigs.putAll(extractStreamsGroupConfigMap());
         return Collections.unmodifiableMap(defaultConfigs);
     }
 
@@ -673,6 +674,17 @@ public class GroupCoordinatorConfig {
             GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
consumerGroupHeartbeatIntervalMs());
     }
 
+    /**
+     * Copy the subset of properties that are relevant to streams group.
+     */
+    public Map<String, Integer> extractStreamsGroupConfigMap() {
+        return Map.of(
+            GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
streamsGroupSessionTimeoutMs(),
+            GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
streamsGroupHeartbeatIntervalMs(),
+            GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
streamsGroupNumStandbyReplicas(),
+            GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
streamsGroupInitialRebalanceDelayMs());
+    }
+
     /**
      * The number of threads or event loops running.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index 675f8f653b3..d50ac1301d0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -17,13 +17,16 @@
 
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -32,6 +35,7 @@ import java.util.Properties;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -81,6 +85,22 @@ public class GroupConfigManagerTest {
         assertEquals(6000, 
config.getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
     }
 
+    @Test
+    public void testValidateUsesAllGroupTypeDefaults() {
+        Map<String, Object> configs = new HashMap<>();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
46000);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
46000);
+
+        GroupCoordinatorConfig groupCoordinatorConfig = 
createGroupCoordinatorConfig(configs);
+        ShareGroupConfig shareGroupConfig = createShareGroupConfig();
+
+        Properties newGroupConfig = new Properties();
+        newGroupConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
"2");
+
+        assertDoesNotThrow(() ->
+            GroupConfigManager.validate(newGroupConfig, 
groupCoordinatorConfig, shareGroupConfig));
+    }
+
     public static GroupConfigManager createConfigManager() {
         Map<String, String> defaultConfig = new HashMap<>();
         defaultConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT));
@@ -88,4 +108,21 @@ public class GroupConfigManagerTest {
         defaultConfig.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
String.valueOf(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT));
         return new GroupConfigManager(defaultConfig);
     }
+
+    private static GroupCoordinatorConfig 
createGroupCoordinatorConfig(Map<String, Object> overrides) {
+        Map<String, Object> configs = new HashMap<>(overrides);
+        return new GroupCoordinatorConfig(new AbstractConfig(
+            GroupCoordinatorConfig.CONFIG_DEF,
+            configs,
+            false
+        ));
+    }
+
+    private static ShareGroupConfig createShareGroupConfig() {
+        return new ShareGroupConfig(new AbstractConfig(
+            Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF, 
GroupCoordinatorConfig.CONFIG_DEF)),
+            new HashMap<>(),
+            false
+        ));
+    }
 }

Reply via email to