This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b0a26bc2f48 KAFKA-19173: Add `Feature` for "streams" group (#19509)
b0a26bc2f48 is described below
commit b0a26bc2f485fa7ec05ba1b5f9e9120fd43abbf4
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Apr 29 22:51:10 2025 -0700
KAFKA-19173: Add `Feature` for "streams" group (#19509)
Add new StreamsGroupFeature, disabled by default, and add "streams" as
default value to `group.coordinator.rebalance.protocols`.
Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot
<[email protected]>, Lucas Brutschy <[email protected]>,
Justine Olshan <[email protected]>, Andrew Schofield
<[email protected]>, Jun Rao <[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 8 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 -
.../server/AbstractApiVersionsRequestTest.scala | 9 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 124 +++++++++++++++------
.../scala/unit/kafka/tools/StorageToolTest.scala | 2 +-
.../coordinator/group/GroupCoordinatorConfig.java | 3 +-
.../controller/FeatureControlManagerTest.java | 2 +-
.../kafka/metadata/storage/FormatterTest.java | 2 +-
.../org/apache/kafka/server/common/Feature.java | 1 +
.../kafka/server/common/MetadataVersion.java | 10 +-
.../apache/kafka/server/common/StreamsVersion.java | 83 ++++++++++++++
.../apache/kafka/server/BrokerFeaturesTest.java | 2 +
.../integration/InternalTopicIntegrationTest.java | 2 +-
.../SmokeTestDriverIntegrationTest.java | 2 +-
.../StandbyTaskCreationIntegrationTest.java | 2 +-
...amsUncaughtExceptionHandlerIntegrationTest.java | 2 +-
.../integration/utils/EmbeddedKafkaCluster.java | 10 --
.../apache/kafka/common/test/api/ClusterTest.java | 2 +-
.../org/apache/kafka/tools/FeatureCommandTest.java | 15 ++-
19 files changed, 216 insertions(+), 69 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3700fa850f3..2841dda409e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -62,7 +62,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager,
DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.authorizer._
-import org.apache.kafka.server.common.{GroupVersion, RequestLocal,
TransactionVersion}
+import org.apache.kafka.server.common.{GroupVersion, RequestLocal,
StreamsVersion, TransactionVersion}
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
SharePartitionKey}
@@ -2649,11 +2649,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
}
+ }
+ private def streamsVersion(): StreamsVersion = {
+
StreamsVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(StreamsVersion.FEATURE_NAME,
0.toShort))
}
private def isStreamsGroupProtocolEnabled: Boolean = {
-
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
+
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS) &&
+ streamsVersion().streamsGroupSupported
}
def handleStreamsGroupHeartbeat(request: RequestChannel.Request):
CompletableFuture[Unit] = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 53c70f4737a..c9ead9cfc64 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -386,10 +386,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
if (!protocols.contains(GroupType.CLASSIC)) {
throw new ConfigException(s"Disabling the '${GroupType.CLASSIC}'
protocol is not supported.")
}
- if (protocols.contains(GroupType.STREAMS)) {
- warn(s"Streams groups and the new '${GroupType.STREAMS}' rebalance
protocol are enabled. " +
- "This is part of the early access of KIP-1071 and MUST NOT be used in
production.")
- }
protocols
}
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 88d25b65d93..811e1d92c9b 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest,
ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
GroupVersion, MetadataVersion, ShareVersion, TransactionVersion}
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
GroupVersion, MetadataVersion, ShareVersion, StreamsVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
@@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
if (apiVersion >= 3) {
- assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size())
+ assertEquals(6, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(),
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(),
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
- assertEquals(6, apiVersionsResponse.data().supportedFeatures().size())
+ assertEquals(7, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) {
assertEquals(1,
apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@@ -88,6 +88,9 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
assertEquals(ShareVersion.SV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
+
+ assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion())
+ assertEquals(StreamsVersion.SV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if
(cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 38c803b6c34..0d311435a2d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -88,7 +88,7 @@ import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult,
Authorizer}
-import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures,
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
+import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures,
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, StreamsVersion,
TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.share.{CachedSharePartition,
ErroneousAndValidPartitionData, SharePartitionKey}
@@ -10007,7 +10007,11 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupHeartbeatRequest(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -10018,9 +10022,7 @@ class KafkaApisTest extends Logging {
requestChannelRequest.context,
streamsGroupHeartbeatRequest
)).thenReturn(future)
- kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
- )
+ kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
@@ -10033,7 +10035,12 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupHeartbeatRequestWithAuthorizedTopology(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
+
val groupId = "group"
val fooTopicName = "foo"
val barTopicName = "bar"
@@ -10084,8 +10091,7 @@ class KafkaApisTest extends Logging {
streamsGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
- authorizer = Some(authorizer),
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
+ authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -10099,7 +10105,11 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -10110,9 +10120,7 @@ class KafkaApisTest extends Logging {
requestChannelRequest.context,
streamsGroupHeartbeatRequest
)).thenReturn(future)
- kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
- )
+ kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@@ -10122,7 +10130,11 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -10132,8 +10144,7 @@ class KafkaApisTest extends Logging {
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
kafkaApis = createKafkaApis(
- authorizer = Some(authorizer),
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
+ authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -10143,7 +10154,12 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
+
val groupId = "group"
val fooTopicName = "foo"
val barTopicName = "bar"
@@ -10184,8 +10200,7 @@ class KafkaApisTest extends Logging {
}
kafkaApis = createKafkaApis(
- authorizer = Some(authorizer),
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
+ authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -10194,7 +10209,7 @@ class KafkaApisTest extends Logging {
}
@Test
- def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = {
+ def testStreamsGroupHeartbeatRequestProtocolDisabledViaConfig(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
@@ -10210,9 +10225,32 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
}
+ @Test
+ def testStreamsGroupHeartbeatRequestProtocolDisabledViaFeature(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
0.toShort))
+
+ metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
+
+ val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response =
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
+ assertEquals(Errors.UNSUPPORTED_VERSION.code, response.data.errorCode)
+ }
+
@Test
def testStreamsGroupHeartbeatRequestInvalidTopicNames(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
new StreamsGroupHeartbeatRequestData.Topology()
@@ -10229,9 +10267,7 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
- kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
- )
+ kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response =
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@@ -10241,7 +10277,11 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupHeartbeatRequestInternalTopicNames(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group").setTopology(
new StreamsGroupHeartbeatRequestData.Topology()
@@ -10257,9 +10297,7 @@ class KafkaApisTest extends Logging {
val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
- kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
- )
+ kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response =
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
@@ -10269,7 +10307,11 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreate(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group");
@@ -10281,9 +10323,7 @@ class KafkaApisTest extends Logging {
streamsGroupHeartbeatRequest
)).thenReturn(future)
- kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
- )
+ kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val missingTopics = Map("test" -> new CreatableTopic())
@@ -10298,7 +10338,11 @@ class KafkaApisTest extends Logging {
@Test
def
testStreamsGroupHeartbeatRequestWithInternalTopicsToCreateMissingCreateACL():
Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group");
@@ -10324,8 +10368,7 @@ class KafkaApisTest extends Logging {
}.asJava
})
kafkaApis = createKafkaApis(
- authorizer = Some(authorizer),
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
+ authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -10519,7 +10562,12 @@ class KafkaApisTest extends Logging {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
+
val fooTopicName = "foo"
val barTopicName = "bar"
@@ -10534,9 +10582,7 @@ class KafkaApisTest extends Logging {
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
- kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
- )
+ kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
@@ -10627,7 +10673,11 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupDescribeAuthorizationFailed(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add("group-id")
@@ -10644,8 +10694,7 @@ class KafkaApisTest extends Logging {
)).thenReturn(future)
future.complete(List().asJava)
kafkaApis = createKafkaApis(
- authorizer = Some(authorizer),
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
+ authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
@@ -10655,7 +10704,11 @@ class KafkaApisTest extends Logging {
@Test
def testStreamsGroupDescribeFutureFailed(): Unit = {
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
streamsGroupDescribeRequestData.groupIds.add("group-id")
@@ -10666,9 +10719,7 @@ class KafkaApisTest extends Logging {
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
- kafkaApis = createKafkaApis(
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
- )
+ kafkaApis = createKafkaApis()
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@@ -10683,7 +10734,11 @@ class KafkaApisTest extends Logging {
val barTopicName = "bar"
val errorMessage = "The described group uses topics that the client is not
authorized to describe."
+ val features = mock(classOf[FinalizedFeatures])
+
when(features.finalizedFeatures()).thenReturn(Collections.singletonMap(StreamsVersion.FEATURE_NAME,
1.toShort))
+
metadataCache = mock(classOf[KRaftMetadataCache])
+ when(metadataCache.features()).thenReturn(features)
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
@@ -10715,8 +10770,7 @@ class KafkaApisTest extends Logging {
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
- authorizer = Some(authorizer),
- overrideProperties =
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG ->
"classic,streams")
+ authorizer = Some(authorizer)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 101b8f43bc4..42e262a6858 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -325,7 +325,7 @@ Found problem:
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Unsupported feature: non.existent.feature. Supported
features are: " +
- "eligible.leader.replicas.version, group.version, kraft.version,
share.version, transaction.version",
+ "eligible.leader.replicas.version, group.version, kraft.version,
share.version, streams.version, transaction.version",
assertThrows(classOf[FormatterException], () =>
runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--feature", "non.existent.feature=20"))).getMessage)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 1221f937e8a..629bc895d3c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -63,7 +63,8 @@ public class GroupCoordinatorConfig {
"The " + Group.GroupType.STREAMS + " rebalance protocol is in
early access and therefore must not be used in production.";
public static final List<String>
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
Group.GroupType.CLASSIC.toString(),
- Group.GroupType.CONSUMER.toString());
+ Group.GroupType.CONSUMER.toString(),
+ Group.GroupType.STREAMS.toString());
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG =
"group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The
duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk.
Increasing this value improves write efficiency and batch size, " +
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index eac143209dd..483defcc6af 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -396,7 +396,7 @@ public class FeatureControlManagerTest {
build();
manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(List.of(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
- "Invalid update version 6 for feature metadata.version. Local
controller 0 only supports versions 7-28")),
+ "Invalid update version 6 for feature metadata.version. Local
controller 0 only supports versions 7-29")),
manager.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
Map.of(MetadataVersion.FEATURE_NAME,
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 880bea07a9e..ebb6f586969 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -368,7 +368,7 @@ public class FormatterTest {
formatter1.formatter.setFeatureLevel("nonexistent.feature",
(short) 1);
assertEquals("Unsupported feature: nonexistent.feature. Supported
features " +
"are: eligible.leader.replicas.version, group.version,
kraft.version, " +
- "share.version, test.feature.version, transaction.version",
+ "share.version, streams.version, test.feature.version,
transaction.version",
assertThrows(FormatterException.class,
() -> formatter1.formatter.run()).
getMessage());
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
index 25bb654577c..60b53856225 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
@@ -48,6 +48,7 @@ public enum Feature {
GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(),
GroupVersion.LATEST_PRODUCTION),
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.values(),
EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(),
ShareVersion.LATEST_PRODUCTION),
+ STREAMS_VERSION(StreamsVersion.FEATURE_NAME, StreamsVersion.values(),
StreamsVersion.LATEST_PRODUCTION),
/**
* Features defined only for unit tests and are not used in production.
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 7e64fa648f5..93be50cbb13 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -127,7 +127,15 @@ public enum MetadataVersion {
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION
ALLOWS A SHARE ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE
TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY.
***
- IBP_4_2_IV0(28, "4.2", "IV0", false);
+ IBP_4_2_IV0(28, "4.2", "IV0", false),
+
+ // Enables "streams" groups by default for new clusters (KIP-1071).
+ //
+ // *** THIS IS A PLACEHOLDER UNSTABLE VERSION WHICH IS USED TO DEFINE THE
POINT AT WHICH ***
+ // *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS
DEFINITION ALLOWS A STREAMS ***
+ // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE
TO BE TURNED ON ***
+ // *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY.
***
+ IBP_4_2_IV1(29, "4.2", "IV1", false);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the
latest version
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
new file mode 100644
index 00000000000..cf910c660dc
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/common/StreamsVersion.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public enum StreamsVersion implements FeatureVersion {
+
+ // Version 0 keeps "streams" groups disabled (KIP-1071).
+ SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
+
+ // Version 1 enables "streams" groups (KIP-1071).
+ // Using metadata version IBP_4_2_IV1 disables it by default in AK 4.1
release, and enables it by default in AK 4.2 release.
+ // - in AK 4.1, this can be enabled as "early access [unstable]"
+ // - in AK 4.2, it is planned to go GA (cf `LATEST_PRODUCTION`)
+ SV_1(1, MetadataVersion.IBP_4_2_IV1, Map.of());
+
+ public static final String FEATURE_NAME = "streams.version";
+
+ // Mark "streams" group as unstable in AK 4.1 release
+ // Needs to be updated to SV_1 in AK 4.2, to mark as stable
+ public static final StreamsVersion LATEST_PRODUCTION = SV_0;
+
+ private final short featureLevel;
+ private final MetadataVersion bootstrapMetadataVersion;
+ private final Map<String, Short> dependencies;
+
+ StreamsVersion(
+ int featureLevel,
+ MetadataVersion bootstrapMetadataVersion,
+ Map<String, Short> dependencies
+ ) {
+ this.featureLevel = (short) featureLevel;
+ this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+ this.dependencies = dependencies;
+ }
+
+ @Override
+ public short featureLevel() {
+ return featureLevel;
+ }
+
+ @Override
+ public String featureName() {
+ return FEATURE_NAME;
+ }
+
+ @Override
+ public MetadataVersion bootstrapMetadataVersion() {
+ return bootstrapMetadataVersion;
+ }
+
+ @Override
+ public Map<String, Short> dependencies() {
+ return dependencies;
+ }
+
+ public boolean streamsGroupSupported() {
+ return featureLevel >= SV_1.featureLevel;
+ }
+
+ public static StreamsVersion fromFeatureLevel(short version) {
+ return switch (version) {
+ case 0 -> SV_0;
+ case 1 -> SV_1;
+ default -> throw new RuntimeException("Unknown streams feature
level: " + (int) version);
+ };
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
index 31ce9c596ee..85c09248c17 100644
--- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
import static
org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
+import static org.apache.kafka.server.common.Feature.STREAMS_VERSION;
import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -99,6 +100,7 @@ public class BrokerFeaturesTest {
GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
+ STREAMS_VERSION.featureName(), STREAMS_VERSION.latestTesting(),
"kraft.version", (short) 0,
"test_feature_1", (short) 4,
"test_feature_2", (short) 3,
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 78eb64a9a26..31e39a4c8b7 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -75,7 +75,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class InternalTopicIntegrationTest {
- public static final EmbeddedKafkaCluster CLUSTER =
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@BeforeAll
public static void startCluster() throws IOException, InterruptedException
{
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 604fdd9b6a1..c40b3433a91 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -51,7 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class SmokeTestDriverIntegrationTest {
- public static final EmbeddedKafkaCluster CLUSTER =
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(3);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
public TestInfo testInfo;
@BeforeAll
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index eb233e8210b..ec48b8b3634 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -57,7 +57,7 @@ import static
org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
public class StandbyTaskCreationIntegrationTest {
private static final int NUM_BROKERS = 1;
- public static final EmbeddedKafkaCluster CLUSTER =
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
private String safeTestName;
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 82100a77293..6f29b7b81f5 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -89,7 +89,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class StreamsUncaughtExceptionHandlerIntegrationTest {
private static final long NOW = Instant.now().toEpochMilli();
- public static final EmbeddedKafkaCluster CLUSTER =
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@BeforeAll
public static void startCluster() throws IOException {
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 6965135767f..f425f8365ee 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -143,16 +143,6 @@ public class EmbeddedKafkaCluster {
this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
}
- public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int
numBrokers) {
- return withStreamsRebalanceProtocol(numBrokers, new Properties());
- }
-
- public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int
numBrokers, final Properties props) {
-
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,streams");
- props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG,
"true");
- return new EmbeddedKafkaCluster(numBrokers, props);
- }
-
public void start() {
try {
cluster.format();
diff --git
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
index 097aac3094c..f81f2739907 100644
---
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
+++
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
@@ -52,7 +52,7 @@ public @interface ClusterTest {
String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
SecurityProtocol controllerSecurityProtocol() default
SecurityProtocol.PLAINTEXT;
String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
- MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV0;
+ MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV1;
ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test
String[] tags() default {};
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index a1ef4ff2e39..f1c68eb0d39 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -64,11 +64,13 @@ public class FeatureCommandTest {
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\t" +
- "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel:
3.3-IV3\t", outputWithoutEpoch(features.get(3)));
+ "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel:
3.3-IV3\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(4)));
+ assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(5)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
- "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(5)));
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(6)));
}
// Use the first MetadataVersion that supports KIP-919
@@ -88,11 +90,13 @@ public class FeatureCommandTest {
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\t" +
- "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel:
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
+ "SupportedMaxVersion: 4.2-IV1\tFinalizedVersionLevel:
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(4)));
+ assertEquals("Feature: streams.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(5)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
- "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(5)));
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(6)));
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_3_3_IV3)
@@ -118,7 +122,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect possible MetadataVersion range
1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. The update failed
for all features since the following " +
- "feature had an error: Invalid update version 0 for feature
metadata.version. Local controller 3000 only supports versions 7-28",
commandOutput);
+ "feature had an error: Invalid update version 0 for feature
metadata.version. Local controller 3000 only supports versions 7-29",
commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1,
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -182,6 +186,7 @@ public class FeatureCommandTest {
"kraft.version was downgraded to 0.\n" +
"metadata.version was downgraded to 18.\n" +
"share.version was downgraded to 0.\n" +
+ "streams.version was downgraded to 0.\n" +
"transaction.version was downgraded to 0.", commandOutput);
}