This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 21a080f08ca KAFKA-16894: Define feature to enable share groups (#19293)
21a080f08ca is described below
commit 21a080f08ca8087794da0b56ed596c79e17a5eb3
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Apr 11 12:14:38 2025 +0100
KAFKA-16894: Define feature to enable share groups (#19293)
This PR proposes a switch to enable share groups for 4.1 (preview) and
4.2 (GA).
* `share.version=1` to indicate that share groups are enabled. This is
used as the switch for turning share groups on and off.
In 4.1, the default will be `share.version=0`. Then a user wanting to
evaluate the preview of KIP-932 would use `bin/kafka-features.sh
--bootstrap.server xxxx upgrade --feature share.version=1`.
In 4.2, the default will be `share.version=1`.
Reviewers: Jun Rao <[email protected]>
---
.../server/AbstractApiVersionsRequestTest.scala | 9 ++-
.../scala/unit/kafka/tools/StorageToolTest.scala | 2 +-
.../controller/FeatureControlManagerTest.java | 2 +-
.../kafka/metadata/storage/FormatterTest.java | 2 +-
.../org/apache/kafka/server/common/Feature.java | 1 +
.../kafka/server/common/MetadataVersion.java | 17 ++++-
.../apache/kafka/server/common/ShareVersion.java | 82 ++++++++++++++++++++++
.../apache/kafka/server/BrokerFeaturesTest.java | 2 +
.../apache/kafka/common/test/api/ClusterTest.java | 2 +-
.../org/apache/kafka/tools/FeatureCommandTest.java | 15 ++--
10 files changed, 121 insertions(+), 13 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index dcbfbcb3497..88d25b65d93 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest,
ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
GroupVersion, MetadataVersion, TransactionVersion}
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
GroupVersion, MetadataVersion, ShareVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
@@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
if (apiVersion >= 3) {
- assertEquals(4, apiVersionsResponse.data().finalizedFeatures().size())
+ assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(),
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(),
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
- assertEquals(5, apiVersionsResponse.data().supportedFeatures().size())
+ assertEquals(6, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) {
assertEquals(1,
apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@@ -85,6 +85,9 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).minVersion())
assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
+
+ assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
+ assertEquals(ShareVersion.SV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if
(cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 9fde243ec19..90979426dd3 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -325,7 +325,7 @@ Found problem:
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Unsupported feature: non.existent.feature. Supported
features are: " +
- "eligible.leader.replicas.version, group.version, kraft.version,
transaction.version",
+ "eligible.leader.replicas.version, group.version, kraft.version,
share.version, transaction.version",
assertThrows(classOf[FormatterException], () =>
runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--feature", "non.existent.feature=20"))).getMessage)
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 55ec4749264..f494ebddcf8 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -387,7 +387,7 @@ public class FeatureControlManagerTest {
build();
manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(List.of(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
- "Invalid update version 6 for feature metadata.version. Local
controller 0 only supports versions 7-26")),
+ "Invalid update version 6 for feature metadata.version. Local
controller 0 only supports versions 7-28")),
manager.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
Map.of(MetadataVersion.FEATURE_NAME,
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 6510e15e44d..880bea07a9e 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -368,7 +368,7 @@ public class FormatterTest {
formatter1.formatter.setFeatureLevel("nonexistent.feature",
(short) 1);
assertEquals("Unsupported feature: nonexistent.feature. Supported
features " +
"are: eligible.leader.replicas.version, group.version,
kraft.version, " +
- "test.feature.version, transaction.version",
+ "share.version, test.feature.version, transaction.version",
assertThrows(FormatterException.class,
() -> formatter1.formatter.run()).
getMessage());
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
index 3ac2923126d..25bb654577c 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
@@ -47,6 +47,7 @@ public enum Feature {
TRANSACTION_VERSION(TransactionVersion.FEATURE_NAME,
TransactionVersion.values(), TransactionVersion.LATEST_PRODUCTION),
GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(),
GroupVersion.LATEST_PRODUCTION),
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.values(),
EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
+ SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(),
ShareVersion.LATEST_PRODUCTION),
/**
* Features defined only for unit tests and are not used in production.
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index cbbdc15cd1b..7e64fa648f5 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -112,7 +112,22 @@ public enum MetadataVersion {
//
// Enables ELR by default for new clusters (KIP-966).
- IBP_4_1_IV0(26, "4.1", "IV0", false);
+ IBP_4_1_IV0(26, "4.1", "IV0", false),
+
+ // Enables share groups. Note, share groups are for preview only in 4.1.
(KIP-932).
+ IBP_4_1_IV1(27, "4.1", "IV1", false),
+
+ // Insert any additional IBP_4_1_IVx versions above this comment, and bump
the feature level of
+ // IBP_4_2_IV0 accordingly. When 4.2 development begins, IBP_4_2_IV0 will
cease to be
+ // a placeholder.
+
+ // Enables share groups by default for new clusters (KIP-932).
+ //
+ // *** THIS IS A PLACEHOLDER UNSTABLE VERSION WHICH IS USED TO DEFINE THE
POINT AT WHICH ***
+ // *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION
ALLOWS A SHARE ***
+ // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE
TO BE TURNED ON ***
+ // *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY.
***
+ IBP_4_2_IV0(28, "4.2", "IV0", false);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the
latest version
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
new file mode 100644
index 00000000000..ccbaa3e65ad
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public enum ShareVersion implements FeatureVersion {
+
+ // Version 0 does not enable share groups.
+ SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
+
+ // Version 1 enables share groups (KIP-932).
+ // This is a preview in 4.1, and production-ready in 4.2.
+ SV_1(1, MetadataVersion.IBP_4_2_IV0, Map.of(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_4_1_IV1.featureLevel()));
+
+ public static final String FEATURE_NAME = "share.version";
+
+ public static final ShareVersion LATEST_PRODUCTION = SV_0;
+
+ private final short featureLevel;
+ private final MetadataVersion bootstrapMetadataVersion;
+ private final Map<String, Short> dependencies;
+
+ ShareVersion(
+ int featureLevel,
+ MetadataVersion bootstrapMetadataVersion,
+ Map<String, Short> dependencies
+ ) {
+ this.featureLevel = (short) featureLevel;
+ this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+ this.dependencies = dependencies;
+ }
+
+ @Override
+ public short featureLevel() {
+ return featureLevel;
+ }
+
+ @Override
+ public String featureName() {
+ return FEATURE_NAME;
+ }
+
+ @Override
+ public MetadataVersion bootstrapMetadataVersion() {
+ return bootstrapMetadataVersion;
+ }
+
+ @Override
+ public Map<String, Short> dependencies() {
+ return dependencies;
+ }
+
+ public boolean supportsShareGroups() {
+ return featureLevel >= SV_1.featureLevel;
+ }
+
+ public static ShareVersion fromFeatureLevel(short version) {
+ switch (version) {
+ case 0:
+ return SV_0;
+ case 1:
+ return SV_1;
+ default:
+ throw new RuntimeException("Unknown share feature level: " +
(int) version);
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
index da0def9b11b..31ce9c596ee 100644
--- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
import static
org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
+import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -97,6 +98,7 @@ public class BrokerFeaturesTest {
TRANSACTION_VERSION.featureName(),
TRANSACTION_VERSION.latestTesting(),
GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
+ SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
"kraft.version", (short) 0,
"test_feature_1", (short) 4,
"test_feature_2", (short) 3,
diff --git
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
index eac9b9c6d9d..097aac3094c 100644
---
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
+++
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
@@ -52,7 +52,7 @@ public @interface ClusterTest {
String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
SecurityProtocol controllerSecurityProtocol() default
SecurityProtocol.PLAINTEXT;
String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
- MetadataVersion metadataVersion() default MetadataVersion.IBP_4_1_IV0;
+ MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV0;
ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test
String[] tags() default {};
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 340889d8765..a1ef4ff2e39 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -64,9 +64,11 @@ public class FeatureCommandTest {
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\t" +
- "SupportedMaxVersion: 4.1-IV0\tFinalizedVersionLevel:
3.3-IV3\t", outputWithoutEpoch(features.get(3)));
+ "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel:
3.3-IV3\t", outputWithoutEpoch(features.get(3)));
+ assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(4)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
- "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(4)));
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(5)));
}
// Use the first MetadataVersion that supports KIP-919
@@ -86,9 +88,11 @@ public class FeatureCommandTest {
assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.3-IV3\t" +
- "SupportedMaxVersion: 4.1-IV0\tFinalizedVersionLevel:
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
+ "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel:
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
+ assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(4)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
- "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(4)));
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(5)));
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_3_3_IV3)
@@ -114,7 +118,7 @@ public class FeatureCommandTest {
);
// Change expected message to reflect possible MetadataVersion range
1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. The update failed
for all features since the following " +
- "feature had an error: Invalid update version 0 for feature
metadata.version. Local controller 3000 only supports versions 7-26",
commandOutput);
+ "feature had an error: Invalid update version 0 for feature
metadata.version. Local controller 3000 only supports versions 7-28",
commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1,
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -177,6 +181,7 @@ public class FeatureCommandTest {
"group.version was downgraded to 0.\n" +
"kraft.version was downgraded to 0.\n" +
"metadata.version was downgraded to 18.\n" +
+ "share.version was downgraded to 0.\n" +
"transaction.version was downgraded to 0.", commandOutput);
}