This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0a26bc2f48 KAFKA-19173: Add `Feature` for "streams" group (#19509)
b0a26bc2f48 is described below

commit b0a26bc2f485fa7ec05ba1b5f9e9120fd43abbf4
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Apr 29 22:51:10 2025 -0700

    KAFKA-19173: Add `Feature` for "streams" group (#19509)
    
    Add new StreamsGroupFeature, disabled by default,  and add "streams" as
    default value to `group.coordinator.rebalance.protocols`.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot
    <[email protected]>, Lucas Brutschy <[email protected]>,
    Justine Olshan <[email protected]>, Andrew Schofield
    <[email protected]>, Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   8 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 -
 .../server/AbstractApiVersionsRequestTest.scala    |   9 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 124 +++++++++++++++------
 .../scala/unit/kafka/tools/StorageToolTest.scala   |   2 +-
 .../coordinator/group/GroupCoordinatorConfig.java  |   3 +-
 .../controller/FeatureControlManagerTest.java      |   2 +-
 .../kafka/metadata/storage/FormatterTest.java      |   2 +-
 .../org/apache/kafka/server/common/Feature.java    |   1 +
 .../kafka/server/common/MetadataVersion.java       |  10 +-
 .../apache/kafka/server/common/StreamsVersion.java |  83 ++++++++++++++
 .../apache/kafka/server/BrokerFeaturesTest.java    |   2 +
 .../integration/InternalTopicIntegrationTest.java  |   2 +-
 .../SmokeTestDriverIntegrationTest.java            |   2 +-
 .../StandbyTaskCreationIntegrationTest.java        |   2 +-
 ...amsUncaughtExceptionHandlerIntegrationTest.java |   2 +-
 .../integration/utils/EmbeddedKafkaCluster.java    |  10 --
 .../apache/kafka/common/test/api/ClusterTest.java  |   2 +-
 .../org/apache/kafka/tools/FeatureCommandTest.java |  15 ++-
 19 files changed, 216 insertions(+), 69 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3700fa850f3..2841dda409e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -62,7 +62,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
 import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, 
DelegationTokenManager, ProcessRole}
 import org.apache.kafka.server.authorizer._
-import org.apache.kafka.server.common.{GroupVersion, RequestLocal, 
TransactionVersion}
+import org.apache.kafka.server.common.{GroupVersion, RequestLocal, 
StreamsVersion, TransactionVersion}
 import org.apache.kafka.server.config.DelegationTokenManagerConfigs
 import org.apache.kafka.server.share.context.ShareFetchContext
 import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, 
SharePartitionKey}
@@ -2649,11 +2649,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
     }
+  }
 
+  private def streamsVersion(): StreamsVersion = {
+    
StreamsVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(StreamsVersion.FEATURE_NAME,
 0.toShort))
   }
 
   private def isStreamsGroupProtocolEnabled: Boolean = {
-      
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
+      
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) &&
+      streamsVersion().streamsGroupSupported
   }
 
   def handleStreamsGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 53c70f4737a..c9ead9cfc64 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -386,10 +386,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     if (!protocols.contains(GroupType.CLASSIC)) {
       throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}' 
protocol is not supported.")
     }
-    if (protocols.contains(GroupType.STREAMS)) {
-      warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance 
protocol are enabled. " +
-        "This is part of the early access of KIP-1071 and MUST NOT be used in 
production.")
-    }
     protocols
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 88d25b65d93..811e1d92c9b 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, RequestUtils}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
GroupVersion, MetadataVersion, ShareVersion, TransactionVersion}
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
GroupVersion, MetadataVersion, ShareVersion, StreamsVersion, TransactionVersion}
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Tag
@@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
   ): Unit = {
     if (apiVersion >= 3) {
-      assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size())
+      assertEquals(6, apiVersionsResponse.data().finalizedFeatures().size())
       assertEquals(MetadataVersion.latestTesting().featureLevel(), 
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
       assertEquals(MetadataVersion.latestTesting().featureLevel(), 
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
 
-      assertEquals(6, apiVersionsResponse.data().supportedFeatures().size())
+      assertEquals(7, apiVersionsResponse.data().supportedFeatures().size())
       assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
       if (apiVersion < 4) {
         assertEquals(1, 
apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@@ -88,6 +88,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
 
       assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
       assertEquals(ShareVersion.SV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
+
+      assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion())
+      assertEquals(StreamsVersion.SV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion())
     }
     val expectedApis = if 
(cluster.controllerListenerName().toScala.contains(listenerName)) {
       ApiVersionsResponse.collectApis(
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 38c803b6c34..0d311435a2d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -88,7 +88,7 @@ import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
Authorizer}
-import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
+import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, StreamsVersion, 
TransactionVersion}
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.metrics.ClientMetricsTestUtils
 import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData, SharePartitionKey}
@@ -10007,7 +10007,11 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupHeartbeatRequest(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
@@ -10018,9 +10022,7 @@ class KafkaApisTest extends Logging {
       requestChannelRequest.context,
       streamsGroupHeartbeatRequest
     )).thenReturn(future)
-    kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
-    )
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
@@ -10033,7 +10035,12 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupHeartbeatRequestWithAuthorizedTopology(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
+
     val groupId = "group"
     val fooTopicName = "foo"
     val barTopicName = "bar"
@@ -10084,8 +10091,7 @@ class KafkaApisTest extends Logging {
       streamsGroupHeartbeatRequest
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+      authorizer = Some(authorizer)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10099,7 +10105,11 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
@@ -10110,9 +10120,7 @@ class KafkaApisTest extends Logging {
       requestChannelRequest.context,
       streamsGroupHeartbeatRequest
     )).thenReturn(future)
-    kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
-    )
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@@ -10122,7 +10130,11 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
@@ -10132,8 +10144,7 @@ class KafkaApisTest extends Logging {
     when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
       .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
     kafkaApis = createKafkaApis(
-      authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+      authorizer = Some(authorizer)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10143,7 +10154,12 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
+
     val groupId = "group"
     val fooTopicName = "foo"
     val barTopicName = "bar"
@@ -10184,8 +10200,7 @@ class KafkaApisTest extends Logging {
     }
 
     kafkaApis = createKafkaApis(
-      authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+      authorizer = Some(authorizer)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10194,7 +10209,7 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = {
+  def testStreamsGroupHeartbeatRequestProtocolDisabledViaConfig(): Unit = {
     metadataCache = mock(classOf[KRaftMetadataCache])
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -10210,9 +10225,32 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
   }
 
+  @Test
+  def testStreamsGroupHeartbeatRequestProtocolDisabledViaFeature(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 0.toShort))
+
+    metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
+
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+    assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
+  }
+
   @Test
   def testStreamsGroupHeartbeatRequestInvalidTopicNames(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
       new StreamsGroupHeartbeatRequestData.Topology()
@@ -10229,9 +10267,7 @@ class KafkaApisTest extends Logging {
 
     val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
 
-    kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
-    )
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@@ -10241,7 +10277,11 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupHeartbeatRequestInternalTopicNames(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
       new StreamsGroupHeartbeatRequestData.Topology()
@@ -10257,9 +10297,7 @@ class KafkaApisTest extends Logging {
 
     val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
 
-    kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
-    )
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@@ -10269,7 +10307,11 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreate(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group");
 
@@ -10281,9 +10323,7 @@ class KafkaApisTest extends Logging {
       streamsGroupHeartbeatRequest
     )).thenReturn(future)
 
-    kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
-    )
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val missingTopics = Map("test" -> new CreatableTopic())
@@ -10298,7 +10338,11 @@ class KafkaApisTest extends Logging {
 
   @Test
   def 
testStreamsGroupHeartbeatRequestWithInternalTopicsToCreateMissingCreateACL(): 
Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group");
 
@@ -10324,8 +10368,7 @@ class KafkaApisTest extends Logging {
       }.asJava
     })
     kafkaApis = createKafkaApis(
-      authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+      authorizer = Some(authorizer)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10519,7 +10562,12 @@ class KafkaApisTest extends Logging {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
+
     val fooTopicName = "foo"
     val barTopicName = "bar"
 
@@ -10534,9 +10582,7 @@ class KafkaApisTest extends Logging {
       any[RequestContext],
       any[util.List[String]]
     )).thenReturn(future)
-    kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
-    )
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
@@ -10627,7 +10673,11 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupDescribeAuthorizationFailed(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
     streamsGroupDescribeRequestData.groupIds.add("group-id")
@@ -10644,8 +10694,7 @@ class KafkaApisTest extends Logging {
     )).thenReturn(future)
     future.complete(List().asJava)
     kafkaApis = createKafkaApis(
-      authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+      authorizer = Some(authorizer)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
@@ -10655,7 +10704,11 @@ class KafkaApisTest extends Logging {
 
   @Test
   def testStreamsGroupDescribeFutureFailed(): Unit = {
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
     streamsGroupDescribeRequestData.groupIds.add("group-id")
@@ -10666,9 +10719,7 @@ class KafkaApisTest extends Logging {
       any[RequestContext],
       any[util.List[String]]
     )).thenReturn(future)
-    kafkaApis = createKafkaApis(
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
-    )
+    kafkaApis = createKafkaApis()
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@@ -10683,7 +10734,11 @@ class KafkaApisTest extends Logging {
     val barTopicName = "bar"
     val errorMessage = "The described group uses topics that the client is not 
authorized to describe."
 
+    val features = mock(classOf[FinalizedFeatures])
+    
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
 1.toShort))
+
     metadataCache = mock(classOf[KRaftMetadataCache])
+    when(metadataCache.features()).thenReturn(features)
 
     val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
     val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
@@ -10715,8 +10770,7 @@ class KafkaApisTest extends Logging {
       any[util.List[String]]
     )).thenReturn(future)
     kafkaApis = createKafkaApis(
-      authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams")
+      authorizer = Some(authorizer)
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
 
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 101b8f43bc4..42e262a6858 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -325,7 +325,7 @@ Found problem:
     properties.putAll(defaultStaticQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     assertEquals("Unsupported feature: non.existent.feature. Supported 
features are: " +
-      "eligible.leader.replicas.version, group.version, kraft.version, 
share.version, transaction.version",
+      "eligible.leader.replicas.version, group.version, kraft.version, 
share.version, streams.version, transaction.version",
         assertThrows(classOf[FormatterException], () =>
           runFormatCommand(new ByteArrayOutputStream(), properties,
             Seq("--feature", "non.existent.feature=20"))).getMessage)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 1221f937e8a..629bc895d3c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -63,7 +63,8 @@ public class GroupCoordinatorConfig {
             "The " + Group.GroupType.STREAMS + " rebalance protocol is in 
early access and therefore must not be used in production.";
     public static final List<String> 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
         Group.GroupType.CLASSIC.toString(),
-        Group.GroupType.CONSUMER.toString());
+        Group.GroupType.CONSUMER.toString(),
+        Group.GroupType.STREAMS.toString());
     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = 
"group.coordinator.append.linger.ms";
     public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The 
duration in milliseconds that the coordinator will " +
         "wait for writes to accumulate before flushing them to disk. 
Increasing this value improves write efficiency and batch size, " +
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index eac143209dd..483defcc6af 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -396,7 +396,7 @@ public class FeatureControlManagerTest {
             build();
         manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
         assertEquals(ControllerResult.of(List.of(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
-            "Invalid update version 6 for feature metadata.version. Local 
controller 0 only supports versions 7-28")),
+            "Invalid update version 6 for feature metadata.version. Local 
controller 0 only supports versions 7-29")),
                 manager.updateFeatures(
                         Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
                         Map.of(MetadataVersion.FEATURE_NAME, 
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 880bea07a9e..ebb6f586969 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -368,7 +368,7 @@ public class FormatterTest {
             formatter1.formatter.setFeatureLevel("nonexistent.feature", 
(short) 1);
             assertEquals("Unsupported feature: nonexistent.feature. Supported 
features " +
                     "are: eligible.leader.replicas.version, group.version, 
kraft.version, " +
-                    "share.version, test.feature.version, transaction.version",
+                    "share.version, streams.version, test.feature.version, 
transaction.version",
                 assertThrows(FormatterException.class,
                     () -> formatter1.formatter.run()).
                         getMessage());
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java 
b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
index 25bb654577c..60b53856225 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
@@ -48,6 +48,7 @@ public enum Feature {
     GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), 
GroupVersion.LATEST_PRODUCTION),
     
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.values(), 
EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
     SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), 
ShareVersion.LATEST_PRODUCTION),
+    STREAMS_VERSION(StreamsVersion.FEATURE_NAME, StreamsVersion.values(), 
StreamsVersion.LATEST_PRODUCTION),
 
     /**
      * Features defined only for unit tests and are not used in production.
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 7e64fa648f5..93be50cbb13 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -127,7 +127,15 @@ public enum MetadataVersion {
     // *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION 
ALLOWS A SHARE   ***
     // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE 
TO BE TURNED ON ***
     // *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY.                      
                ***
-    IBP_4_2_IV0(28, "4.2", "IV0", false);
+    IBP_4_2_IV0(28, "4.2", "IV0", false),
+
+    // Enables "streams" groups by default for new clusters (KIP-1071).
+    //
+    // *** THIS IS A PLACEHOLDER UNSTABLE VERSION WHICH IS USED TO DEFINE THE 
POINT AT WHICH     ***
+    // *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS 
DEFINITION ALLOWS A STREAMS ***
+    // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE 
TO BE TURNED ON   ***
+    // *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY.                 
                  ***
+    IBP_4_2_IV1(29, "4.2", "IV1", false);
 
     // NOTES when adding a new version:
     //   Update the default version in @ClusterTest annotation to point to the 
latest version
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
new file mode 100644
index 00000000000..cf910c660dc
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public enum StreamsVersion implements FeatureVersion {
+
+    // Version 0 keeps "streams" groups disabled (KIP-1071).
+    SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
+
+    // Version 1 enables "streams" groups (KIP-1071).
+    // Using metadata version IBP_4_2_IV1 disables it by default in AK 4.1 
release, and enables it by default in AK 4.2 release.
+    //  - in AK 4.1, this can be enabled as "early access [unstable]"
+    //  - in AK 4.2, it is planned to go GA (cf `LATEST_PRODUCTION`)
+    SV_1(1, MetadataVersion.IBP_4_2_IV1, Map.of());
+
+    public static final String FEATURE_NAME = "streams.version";
+
+    // Mark "streams" group as unstable in AK 4.1 release
+    // Needs to be updated to SV_1 in AK 4.2, to mark as stable
+    public static final StreamsVersion LATEST_PRODUCTION = SV_0;
+
+    private final short featureLevel;
+    private final MetadataVersion bootstrapMetadataVersion;
+    private final Map<String, Short> dependencies;
+
+    StreamsVersion(
+        int featureLevel,
+        MetadataVersion bootstrapMetadataVersion,
+        Map<String, Short> dependencies
+    ) {
+        this.featureLevel = (short) featureLevel;
+        this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+        this.dependencies = dependencies;
+    }
+
+    @Override
+    public short featureLevel() {
+        return featureLevel;
+    }
+
+    @Override
+    public String featureName() {
+        return FEATURE_NAME;
+    }
+
+    @Override
+    public MetadataVersion bootstrapMetadataVersion() {
+        return bootstrapMetadataVersion;
+    }
+
+    @Override
+    public Map<String, Short> dependencies() {
+        return dependencies;
+    }
+
+    public boolean streamsGroupSupported() {
+        return featureLevel >= SV_1.featureLevel;
+    }
+
+    public static StreamsVersion fromFeatureLevel(short version) {
+        return switch (version) {
+            case 0 -> SV_0;
+            case 1 -> SV_1;
+            default -> throw new RuntimeException("Unknown streams feature 
level: " + (int) version);
+        };
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java 
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
index 31ce9c596ee..85c09248c17 100644
--- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import static 
org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
 import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
 import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
+import static org.apache.kafka.server.common.Feature.STREAMS_VERSION;
 import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -99,6 +100,7 @@ public class BrokerFeaturesTest {
                 GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
                 ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), 
ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
                 SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
+                STREAMS_VERSION.featureName(), STREAMS_VERSION.latestTesting(),
                 "kraft.version", (short) 0,
                 "test_feature_1", (short) 4,
                 "test_feature_2", (short) 3,
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 78eb64a9a26..31e39a4c8b7 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -75,7 +75,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Timeout(600)
 @Tag("integration")
 public class InternalTopicIntegrationTest {
-    public static final EmbeddedKafkaCluster CLUSTER = 
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
     @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 604fdd9b6a1..c40b3433a91 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Timeout(600)
 @Tag("integration")
 public class SmokeTestDriverIntegrationTest {
-    public static final EmbeddedKafkaCluster CLUSTER = 
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(3);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
     public TestInfo testInfo;
 
     @BeforeAll
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index eb233e8210b..ec48b8b3634 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -57,7 +57,7 @@ import static 
org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 public class StandbyTaskCreationIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
-    public static final EmbeddedKafkaCluster CLUSTER = 
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
     private String safeTestName;
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 82100a77293..6f29b7b81f5 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -89,7 +89,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 public class StreamsUncaughtExceptionHandlerIntegrationTest {
     private static final long NOW = Instant.now().toEpochMilli();
 
-    public static final EmbeddedKafkaCluster CLUSTER = 
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
     @BeforeAll
     public static void startCluster() throws IOException {
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 6965135767f..f425f8365ee 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -143,16 +143,6 @@ public class EmbeddedKafkaCluster {
         this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
     }
 
-    public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int 
numBrokers) {
-        return withStreamsRebalanceProtocol(numBrokers, new Properties());
-    }
-
-    public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int 
numBrokers, final Properties props) {
-        
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams");
-        props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true");
-        return new EmbeddedKafkaCluster(numBrokers, props);
-    }
-
     public void start() {
         try {
             cluster.format();
diff --git 
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
 
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
index 097aac3094c..f81f2739907 100644
--- 
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
+++ 
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
@@ -52,7 +52,7 @@ public @interface ClusterTest {
     String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
     SecurityProtocol controllerSecurityProtocol() default 
SecurityProtocol.PLAINTEXT;
     String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
-    MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV0;
+    MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV1;
     ClusterConfigProperty[] serverProperties() default {};
     // users can add tags that they want to display in test
     String[] tags() default {};
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index a1ef4ff2e39..f1c68eb0d39 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -64,11 +64,13 @@ public class FeatureCommandTest {
         assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(2)));
         assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\t" +
-                "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 
3.3-IV3\t", outputWithoutEpoch(features.get(3)));
+                "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 
3.3-IV3\t", outputWithoutEpoch(features.get(3)));
         assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(4)));
+        assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+                "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(5)));
         assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
-                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(5)));
+                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(6)));
     }
 
     // Use the first MetadataVersion that supports KIP-919
@@ -88,11 +90,13 @@ public class FeatureCommandTest {
         assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(2)));
         assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\t" +
-                "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
+                "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel: 
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
         assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(4)));
+        assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+                "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(5)));
         assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
-                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(5)));
+                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(6)));
     }
 
     @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_3_IV3)
@@ -118,7 +122,7 @@ public class FeatureCommandTest {
         );
         // Change expected message to reflect possible MetadataVersion range 
1-N (N increases when adding a new version)
         assertEquals("Could not disable metadata.version. The update failed 
for all features since the following " +
-                "feature had an error: Invalid update version 0 for feature 
metadata.version. Local controller 3000 only supports versions 7-28", 
commandOutput);
+                "feature had an error: Invalid update version 0 for feature 
metadata.version. Local controller 3000 only supports versions 7-29", 
commandOutput);
 
         commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -182,6 +186,7 @@ public class FeatureCommandTest {
                 "kraft.version was downgraded to 0.\n" +
                 "metadata.version was downgraded to 18.\n" +
                 "share.version was downgraded to 0.\n" +
+                "streams.version was downgraded to 0.\n" +
                 "transaction.version was downgraded to 0.", commandOutput);
     }
 


Reply via email to