This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff785ac2516 KAFKA-18651: Add Streams-specific broker configurations 
(#19176)
ff785ac2516 is described below

commit ff785ac251624d834547073cc598aa9d383196ed
Author: Alieh Saeedi <[email protected]>
AuthorDate: Thu Mar 13 18:05:24 2025 +0100

    KAFKA-18651: Add Streams-specific broker configurations (#19176)
    
    This change implements the broker-side configs proposed in KIP-1071.
    The configurations implemented by this PR are only those that were 
specifically aimed to be included in `AK 4.1`.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |   8 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   6 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  10 ++
 .../kafka/coordinator/group/GroupConfig.java       |  85 ++++++++++-
 .../coordinator/group/GroupCoordinatorConfig.java  | 160 ++++++++++++++++++++-
 .../kafka/coordinator/group/GroupConfigTest.java   |  35 +++++
 .../group/GroupCoordinatorConfigTest.java          |   8 ++
 7 files changed, 302 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f38e62833cb..1c91f58492e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -408,11 +408,11 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
         "This is part of the early access of KIP-932 and MUST NOT be used in 
production.")
     }
     if (protocols.contains(GroupType.STREAMS)) {
-      if (processRoles.isEmpty || !isNewGroupCoordinatorEnabled) {
-        throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance 
protocol is only supported in KRaft cluster with the new group coordinator.")
+      if (!isNewGroupCoordinatorEnabled) {
+        warn(s"The new '${GroupType.STREAMS}' rebalance protocol is only 
supported with the new group coordinator.")
       }
-      warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled 
along with the new group coordinator. " +
-        "This is part of the preview of KIP-1071 and MUST NOT be used in 
production.")
+      warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance 
protocol are enabled. " +
+        "This is part of the early access of KIP-1071 and MUST NOT be used in 
production.")
     }
     protocols
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c62926c82c7..9325c92bd59 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -72,7 +72,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG}
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, 
GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -335,6 +335,10 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, 
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
+    cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+    cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
+    cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
+
     when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
 
     val describeConfigsRequest = new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 797fdce5451..1a57724d543 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1034,6 +1034,16 @@ class KafkaConfigTest {
         case ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG =>  
//ignore string
         case ShareGroupConfig.SHARE_FETCH_MAX_FETCH_RECORDS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
 
+        /** Streams groups configs */
+        case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
 
         case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", 
"-1")
       }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 03f0af738d2..b3eac01ff6b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -59,6 +59,12 @@ public final class GroupConfig extends AbstractConfig {
         "Negative duration is not allowed.</li>" +
         "<li>anything else: throw exception to the share consumer.</li></ul>";
 
+    public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG = 
"group.streams.session.timeout.ms";
+
+    public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG = 
"group.streams.heartbeat.interval.ms";
+
+    public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = 
"group.streams.num.standby.replicas";
+
     public final int consumerSessionTimeoutMs;
 
     public final int consumerHeartbeatIntervalMs;
@@ -71,6 +77,12 @@ public final class GroupConfig extends AbstractConfig {
 
     public final String shareAutoOffsetReset;
 
+    public final int streamsSessionTimeoutMs;
+
+    public final int streamsHeartbeatIntervalMs;
+
+    public final int streamsNumStandbyReplicas;
+
     private static final ConfigDef CONFIG = new ConfigDef()
         .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
             INT,
@@ -107,7 +119,25 @@ public final class GroupConfig extends AbstractConfig {
             SHARE_AUTO_OFFSET_RESET_DEFAULT,
             new ShareGroupAutoOffsetResetStrategy.Validator(),
             MEDIUM,
-            SHARE_AUTO_OFFSET_RESET_DOC);
+            SHARE_AUTO_OFFSET_RESET_DOC)
+        .define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+            INT,
+            GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
+            atLeast(1),
+            MEDIUM,
+            GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
+        .define(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+            INT,
+            GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
+            atLeast(1),
+            MEDIUM,
+            GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+        .define(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+            INT,
+            GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT,
+            atLeast(0),
+            MEDIUM,
+            GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
 
     public GroupConfig(Map<?, ?> props) {
         super(CONFIG, props, false);
@@ -117,6 +147,9 @@ public final class GroupConfig extends AbstractConfig {
         this.shareHeartbeatIntervalMs = 
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareRecordLockDurationMs = 
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
         this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
+        this.streamsSessionTimeoutMs = 
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
+        this.streamsHeartbeatIntervalMs = 
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.streamsNumStandbyReplicas = 
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
     }
 
     public static ConfigDef configDef() {
@@ -146,13 +179,16 @@ public final class GroupConfig extends AbstractConfig {
     /**
      * Validates the values of the given properties.
      */
-    @SuppressWarnings("NPathComplexity")
+    @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
     private static void validateValues(Map<?, ?> valueMaps, 
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig 
shareGroupConfig) {
         int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
         int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
         int shareHeartbeatInterval = (Integer) 
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         int shareSessionTimeout = (Integer) 
valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
         int shareRecordLockDurationMs = (Integer) 
valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+        int streamsSessionTimeoutMs = (Integer) 
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
+        int streamsHeartbeatIntervalMs = (Integer) 
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
+        int streamsNumStandbyReplicas = (Integer) 
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
         if (consumerHeartbeatInterval < 
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
             throw new 
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
                 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -193,6 +229,26 @@ public final class GroupConfig extends AbstractConfig {
             throw new 
InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be 
less than or equal to " +
                 
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
         }
+        if (streamsHeartbeatIntervalMs < 
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
+            throw new 
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
+                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+        }
+        if (streamsHeartbeatIntervalMs > 
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()) {
+            throw new 
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
less than or equal to " +
+                
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+        }
+        if (streamsSessionTimeoutMs < 
groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs()) {
+            throw new 
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than or equal to " +
+                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+        }
+        if (streamsSessionTimeoutMs > 
groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()) {
+            throw new 
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be 
less than or equal to " +
+                
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+        }
+        if (streamsNumStandbyReplicas > 
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()) {
+            throw new 
InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be 
less than or equal to " +
+                
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
+        }
         if (consumerSessionTimeout <= consumerHeartbeatInterval) {
             throw new 
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
                 CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -201,6 +257,10 @@ public final class GroupConfig extends AbstractConfig {
             throw new 
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
                 SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         }
+        if (streamsSessionTimeoutMs <= streamsHeartbeatIntervalMs) {
+            throw new 
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
+                STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
+        }
     }
 
     /**
@@ -271,4 +331,25 @@ public final class GroupConfig extends AbstractConfig {
     public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
         return 
ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
     }
+
+    /**
+     * The streams group session timeout in milliseconds.
+     */
+    public int streamsSessionTimeoutMs() {
+        return streamsSessionTimeoutMs;
+    }
+
+    /**
+     * The streams group heartbeat interval in milliseconds.
+     */
+    public int streamsHeartbeatIntervalMs() {
+        return streamsHeartbeatIntervalMs;
+    }
+
+    /**
+     * The number of streams standby replicas for each task.
+     */
+    public int streamsNumStandbyReplicas() {
+        return streamsNumStandbyReplicas;
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 7ebe63f9c5a..81010c65f14 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -68,8 +68,7 @@ public class GroupCoordinatorConfig {
             "The " + Group.GroupType.SHARE + " rebalance protocol is in early 
access and therefore must not be used in production.";
     public static final List<String> 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
         Group.GroupType.CLASSIC.toString(),
-        Group.GroupType.CONSUMER.toString()
-    );
+        Group.GroupType.CONSUMER.toString());
     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = 
"group.coordinator.append.linger.ms";
     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The 
duration in milliseconds that the coordinator will " +
         "wait for writes to accumulate before flushing them to disk. 
Transactional writes are not accumulated.";
@@ -238,6 +237,45 @@ public class GroupCoordinatorConfig {
     public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 
15000;
     public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = 
"The maximum heartbeat interval for share group members.";
 
+    ///
+    /// Streams group configs
+    ///
+    public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = 
"group.streams.session.timeout.ms";
+    public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
+    public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The 
timeout to detect client failures when using the streams group protocol.";
+
+    public static final String STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.streams.min.session.timeout.ms";
+    public static final int STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 
45000;
+    public static final String STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The 
minimum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
+
+    public static final String STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.streams.max.session.timeout.ms";
+    public static final int STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 
60000;
+    public static final String STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The 
maximum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
+
+    public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = 
"group.streams.heartbeat.interval.ms";
+    public static final int STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000;
+    public static final String STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The 
heartbeat interval given to the members.";
+
+    public static final String STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG 
= "group.streams.min.heartbeat.interval.ms";
+    public static final int STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT = 
5000;
+    public static final String STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC = 
"The minimum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
+
+    public static final String STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG 
= "group.streams.max.heartbeat.interval.ms";
+    public static final int STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 
15000;
+    public static final String STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = 
"The maximum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
+
+    public static final String STREAMS_GROUP_MAX_SIZE_CONFIG = 
"group.streams.max.size";
+    public static final int STREAMS_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+    public static final String STREAMS_GROUP_MAX_SIZE_DOC = "The maximum 
number of streams clients that a single streams group can accommodate.";
+
+    public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG = 
"group.streams.num.standby.replicas";
+    public static final int STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT = 0;
+    public static final String STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC = "The 
number of standby replicas for each task.";
+
+    public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG = 
"group.streams.max.standby.replicas";
+    public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2;
+    public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The 
maximum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
+
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
         // Group coordinator configs
         .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
@@ -282,7 +320,19 @@ public class GroupCoordinatorConfig {
         .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
         .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
         .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
-        .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, 
SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, 
SHARE_GROUP_MAX_SIZE_DOC);
+        .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, 
SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, 
SHARE_GROUP_MAX_SIZE_DOC)
+
+        // Streams group configs
+        .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
+        .define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+        .define(STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, 
STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+        .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+        .define(STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
+        .define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
+        .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, 
STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
+        .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
+        .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
+
 
     /**
      * The timeout used to wait for a new member in milliseconds.
@@ -321,6 +371,16 @@ public class GroupCoordinatorConfig {
     private final int shareGroupHeartbeatIntervalMs;
     private final int shareGroupMinHeartbeatIntervalMs;
     private final int shareGroupMaxHeartbeatIntervalMs;
+    // Streams group configurations
+    private final int streamsGroupSessionTimeoutMs;
+    private final int streamsGroupMinSessionTimeoutMs;
+    private final int streamsGroupMaxSessionTimeoutMs;
+    private final int streamsGroupHeartbeatIntervalMs;
+    private final int streamsGroupMinHeartbeatIntervalMs;
+    private final int streamsGroupMaxHeartbeatIntervalMs;
+    private final int streamsGroupMaxSize;
+    private final int streamsGroupNumStandbyReplicas;
+    private final int streamsGroupMaxStandbyReplicas;
 
     @SuppressWarnings("this-escape")
     public GroupCoordinatorConfig(AbstractConfig config) {
@@ -359,6 +419,16 @@ public class GroupCoordinatorConfig {
         this.shareGroupMinHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareGroupMaxHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG);
+        // Streams group configurations
+        this.streamsGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
+        this.streamsGroupMinSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+        this.streamsGroupMaxSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+        this.streamsGroupHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.streamsGroupMinHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.streamsGroupMaxHeartbeatIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.streamsGroupMaxSize = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG);
+        this.streamsGroupNumStandbyReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
+        this.streamsGroupMaxStandbyReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
 
         // New group coordinator configs validation.
         require(consumerGroupMaxHeartbeatIntervalMs >= 
consumerGroupMinHeartbeatIntervalMs,
@@ -400,6 +470,27 @@ public class GroupCoordinatorConfig {
         require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
             String.format("%s must be less than %s",
                 SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+        // Streams group configs validation.
+        require(streamsGroupMaxHeartbeatIntervalMs >= 
streamsGroupMinHeartbeatIntervalMs,
+            String.format("%s must be greater than or equal to %s",
+                STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
+        require(streamsGroupHeartbeatIntervalMs >= 
streamsGroupMinHeartbeatIntervalMs,
+            String.format("%s must be greater than or equal to %s",
+                STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
+        require(streamsGroupHeartbeatIntervalMs <= 
streamsGroupMaxHeartbeatIntervalMs,
+            String.format("%s must be less than or equal to %s",
+                STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
+        require(streamsGroupMaxSessionTimeoutMs >= 
streamsGroupMinSessionTimeoutMs,
+            String.format("%s must be greater than or equal to %s", 
STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
+        require(streamsGroupSessionTimeoutMs >= 
streamsGroupMinSessionTimeoutMs,
+            String.format("%s must be greater than or equal to %s", 
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
+        require(streamsGroupSessionTimeoutMs <= 
streamsGroupMaxSessionTimeoutMs,
+            String.format("%s must be less than or equal to %s", 
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
+        require(streamsGroupNumStandbyReplicas <= 
streamsGroupMaxStandbyReplicas,
+            String.format("%s must be less than or equal to %s", 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
+        require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
+            String.format("%s must be less than %s",
+                STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
     }
 
     public static GroupCoordinatorConfig fromProps(
@@ -721,4 +812,67 @@ public class GroupCoordinatorConfig {
     public int shareGroupMaxHeartbeatIntervalMs() {
         return shareGroupMaxHeartbeatIntervalMs;
     }
+
+    /**
+     * The streams group session timeout in milliseconds.
+     */
+    public int streamsGroupSessionTimeoutMs() {
+        return streamsGroupSessionTimeoutMs;
+    }
+
+    /**
+     * The maximum allowed session timeout for registered streams consumers.
+     */
+    public int streamsGroupMaxSessionTimeoutMs() {
+        return streamsGroupMaxSessionTimeoutMs;
+    }
+
+    /**
+     * The minimum allowed session timeout for registered streams consumers.
+     */
+    public int streamsGroupMinSessionTimeoutMs() {
+        return streamsGroupMinSessionTimeoutMs;
+    }
+
+    /**
+     * The streams group heartbeat interval in milliseconds.
+     */
+    public int streamsGroupHeartbeatIntervalMs() {
+        return streamsGroupHeartbeatIntervalMs;
+    }
+
+    /**
+     * The minimum heartbeat interval for registered streams consumers.
+     */
+    public int streamsGroupMinHeartbeatIntervalMs() {
+        return streamsGroupMinHeartbeatIntervalMs;
+    }
+
+    /**
+     * The maximum heartbeat interval for registered streams consumers.
+     */
+    public int streamsGroupMaxHeartbeatIntervalMs() {
+        return streamsGroupMaxHeartbeatIntervalMs;
+    }
+
+    /**
+     * The streams group maximum size.
+     */
+    public int streamsGroupMaxSize() {
+        return streamsGroupMaxSize;
+    }
+
+    /**
+     * The number of streams standby replicas for each task.
+     */
+    public int streamsGroupNumStandbyReplicas() {
+        return streamsGroupNumStandbyReplicas;
+    }
+
+    /**
+     * The maximum number of streams standby replicas for each task.
+     */
+    public int streamsGroupMaxNumStandbyReplicas() {
+        return streamsGroupMaxStandbyReplicas;
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index b774d29bb6a..4e77eb15125 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -61,6 +61,12 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
             } else if 
(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "hello", "1.0");
+            } else if 
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_number", "1.0");
+            } else if 
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_number", "1.0");
+            } else if 
(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_number", "1.0");
             } else {
                 assertPropertyInvalid(name, "not_a_number", "-0.1");
             }
@@ -164,6 +170,26 @@ public class GroupConfigTest {
         // Check for invalid shareAutoOffsetReset, by_duration with invalid 
duration
         props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, 
"by_duration:invalid");
         doTestInvalidProps(props, ConfigException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid streamsSessionTimeoutMs, < MIN
+        props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "1");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid streamsSessionTimeoutMs, > MAX
+        props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "70000");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid streamsHeartbeatIntervalMs, < MIN
+        props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid streamsHeartbeatIntervalMs, > MAX
+        props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "70000");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
     }
 
     private void doTestInvalidProps(Properties props, Class<? extends 
Exception> exceptionClassName) {
@@ -183,6 +209,9 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
"2000");
         defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+        defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10");
+        defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
"2000");
+        defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
 
         Properties props = new Properties();
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20");
@@ -194,6 +223,9 @@ public class GroupConfigTest {
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
         assertEquals("latest", 
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
+        assertEquals(10, 
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
+        assertEquals(2000, 
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
+        assertEquals(1, 
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
     }
 
     @Test
@@ -212,6 +244,9 @@ public class GroupConfigTest {
         props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
         props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "30000");
         props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
+        props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
+        props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
+        props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         return props;
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 9979ea48964..267f7ded413 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -272,6 +272,14 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
50000);
         assertEquals("group.share.heartbeat.interval.ms must be less than 
group.share.session.timeout.ms",
                 assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 45000);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 60000);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
50000);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
50000);
+        assertEquals("group.streams.heartbeat.interval.ms must be less than 
group.streams.session.timeout.ms",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
     }
 
     public static GroupCoordinatorConfig createGroupCoordinatorConfig(


Reply via email to