This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 21a080f08ca KAFKA-16894: Define feature to enable share groups (#19293)
21a080f08ca is described below

commit 21a080f08ca8087794da0b56ed596c79e17a5eb3
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Apr 11 12:14:38 2025 +0100

    KAFKA-16894: Define feature to enable share groups (#19293)
    
    This PR proposes a switch to enable share groups for 4.1 (preview) and
    4.2 (GA).
    
    * `share.version=1` to indicate that share groups are enabled. This is
    used as the switch for turning share groups on and off.
    
    In 4.1, the default will be `share.version=0`. Then a user wanting to
    evaluate the preview of KIP-932 would use `bin/kafka-features.sh
    --bootstrap.server xxxx upgrade --feature share.version=1`.
    
    In 4.2, the default will be `share.version=1`.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../server/AbstractApiVersionsRequestTest.scala    |  9 ++-
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  2 +-
 .../controller/FeatureControlManagerTest.java      |  2 +-
 .../kafka/metadata/storage/FormatterTest.java      |  2 +-
 .../org/apache/kafka/server/common/Feature.java    |  1 +
 .../kafka/server/common/MetadataVersion.java       | 17 ++++-
 .../apache/kafka/server/common/ShareVersion.java   | 82 ++++++++++++++++++++++
 .../apache/kafka/server/BrokerFeaturesTest.java    |  2 +
 .../apache/kafka/common/test/api/ClusterTest.java  |  2 +-
 .../org/apache/kafka/tools/FeatureCommandTest.java | 15 ++--
 10 files changed, 121 insertions(+), 13 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index dcbfbcb3497..88d25b65d93 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, RequestUtils}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
GroupVersion, MetadataVersion, TransactionVersion}
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
GroupVersion, MetadataVersion, ShareVersion, TransactionVersion}
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Tag
@@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
   ): Unit = {
     if (apiVersion >= 3) {
-      assertEquals(4, apiVersionsResponse.data().finalizedFeatures().size())
+      assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size())
       assertEquals(MetadataVersion.latestTesting().featureLevel(), 
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
       assertEquals(MetadataVersion.latestTesting().featureLevel(), 
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
 
-      assertEquals(5, apiVersionsResponse.data().supportedFeatures().size())
+      assertEquals(6, apiVersionsResponse.data().supportedFeatures().size())
       assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
       if (apiVersion < 4) {
         assertEquals(1, 
apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@@ -85,6 +85,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
 
       assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).minVersion())
       assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
+
+      assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
+      assertEquals(ShareVersion.SV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
     }
     val expectedApis = if 
(cluster.controllerListenerName().toScala.contains(listenerName)) {
       ApiVersionsResponse.collectApis(
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 9fde243ec19..90979426dd3 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -325,7 +325,7 @@ Found problem:
     properties.putAll(defaultStaticQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     assertEquals("Unsupported feature: non.existent.feature. Supported 
features are: " +
-      "eligible.leader.replicas.version, group.version, kraft.version, 
transaction.version",
+      "eligible.leader.replicas.version, group.version, kraft.version, 
share.version, transaction.version",
         assertThrows(classOf[FormatterException], () =>
           runFormatCommand(new ByteArrayOutputStream(), properties,
             Seq("--feature", "non.existent.feature=20"))).getMessage)
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 55ec4749264..f494ebddcf8 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -387,7 +387,7 @@ public class FeatureControlManagerTest {
             build();
         manager.replay(new 
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
         assertEquals(ControllerResult.of(List.of(), new 
ApiError(Errors.INVALID_UPDATE_VERSION,
-            "Invalid update version 6 for feature metadata.version. Local 
controller 0 only supports versions 7-26")),
+            "Invalid update version 6 for feature metadata.version. Local 
controller 0 only supports versions 7-28")),
                 manager.updateFeatures(
                         Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
                         Map.of(MetadataVersion.FEATURE_NAME, 
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 6510e15e44d..880bea07a9e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -368,7 +368,7 @@ public class FormatterTest {
             formatter1.formatter.setFeatureLevel("nonexistent.feature", 
(short) 1);
             assertEquals("Unsupported feature: nonexistent.feature. Supported 
features " +
                     "are: eligible.leader.replicas.version, group.version, 
kraft.version, " +
-                    "test.feature.version, transaction.version",
+                    "share.version, test.feature.version, transaction.version",
                 assertThrows(FormatterException.class,
                     () -> formatter1.formatter.run()).
                         getMessage());
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java 
b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
index 3ac2923126d..25bb654577c 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java
@@ -47,6 +47,7 @@ public enum Feature {
     TRANSACTION_VERSION(TransactionVersion.FEATURE_NAME, 
TransactionVersion.values(), TransactionVersion.LATEST_PRODUCTION),
     GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), 
GroupVersion.LATEST_PRODUCTION),
     
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, 
EligibleLeaderReplicasVersion.values(), 
EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
+    SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), 
ShareVersion.LATEST_PRODUCTION),
 
     /**
      * Features defined only for unit tests and are not used in production.
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index cbbdc15cd1b..7e64fa648f5 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -112,7 +112,22 @@ public enum MetadataVersion {
     //
 
     // Enables ELR by default for new clusters (KIP-966).
-    IBP_4_1_IV0(26, "4.1", "IV0", false);
+    IBP_4_1_IV0(26, "4.1", "IV0", false),
+
+    // Enables share groups. Note, share groups are for preview only in 4.1. 
(KIP-932).
+    IBP_4_1_IV1(27, "4.1", "IV1", false),
+
+    // Insert any additional IBP_4_1_IVx versions above this comment, and bump 
the feature level of
+    // IBP_4_2_IV0 accordingly. When 4.2 development begins, IBP_4_2_IV0 will 
cease to be
+    // a placeholder.
+
+    // Enables share groups by default for new clusters (KIP-932).
+    //
+    // *** THIS IS A PLACEHOLDER UNSTABLE VERSION WHICH IS USED TO DEFINE THE 
POINT AT WHICH   ***
+    // *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION 
ALLOWS A SHARE   ***
+    // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE 
TO BE TURNED ON ***
+    // *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY.                      
                ***
+    IBP_4_2_IV0(28, "4.2", "IV0", false);
 
     // NOTES when adding a new version:
     //   Update the default version in @ClusterTest annotation to point to the 
latest version
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java 
b/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
new file mode 100644
index 00000000000..ccbaa3e65ad
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public enum ShareVersion implements FeatureVersion {
+
+    // Version 0 does not enable share groups.
+    SV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
+
+    // Version 1 enables share groups (KIP-932).
+    // This is a preview in 4.1, and production-ready in 4.2.
+    SV_1(1, MetadataVersion.IBP_4_2_IV0, Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_4_1_IV1.featureLevel()));
+
+    public static final String FEATURE_NAME = "share.version";
+
+    public static final ShareVersion LATEST_PRODUCTION = SV_0;
+
+    private final short featureLevel;
+    private final MetadataVersion bootstrapMetadataVersion;
+    private final Map<String, Short> dependencies;
+
+    ShareVersion(
+        int featureLevel,
+        MetadataVersion bootstrapMetadataVersion,
+        Map<String, Short> dependencies
+    ) {
+        this.featureLevel = (short) featureLevel;
+        this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+        this.dependencies = dependencies;
+    }
+
+    @Override
+    public short featureLevel() {
+        return featureLevel;
+    }
+
+    @Override
+    public String featureName() {
+        return FEATURE_NAME;
+    }
+
+    @Override
+    public MetadataVersion bootstrapMetadataVersion() {
+        return bootstrapMetadataVersion;
+    }
+
+    @Override
+    public Map<String, Short> dependencies() {
+        return dependencies;
+    }
+
+    public boolean supportsShareGroups() {
+        return featureLevel >= SV_1.featureLevel;
+    }
+
+    public static ShareVersion fromFeatureLevel(short version) {
+        switch (version) {
+            case 0:
+                return SV_0;
+            case 1:
+                return SV_1;
+            default:
+                throw new RuntimeException("Unknown share feature level: " + 
(int) version);
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java 
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
index da0def9b11b..31ce9c596ee 100644
--- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import static 
org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
 import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
+import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
 import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -97,6 +98,7 @@ public class BrokerFeaturesTest {
                 TRANSACTION_VERSION.featureName(), 
TRANSACTION_VERSION.latestTesting(),
                 GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
                 ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), 
ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
+                SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
                 "kraft.version", (short) 0,
                 "test_feature_1", (short) 4,
                 "test_feature_2", (short) 3,
diff --git 
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
 
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
index eac9b9c6d9d..097aac3094c 100644
--- 
a/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
+++ 
b/test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
@@ -52,7 +52,7 @@ public @interface ClusterTest {
     String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
     SecurityProtocol controllerSecurityProtocol() default 
SecurityProtocol.PLAINTEXT;
     String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
-    MetadataVersion metadataVersion() default MetadataVersion.IBP_4_1_IV0;
+    MetadataVersion metadataVersion() default MetadataVersion.IBP_4_2_IV0;
     ClusterConfigProperty[] serverProperties() default {};
     // users can add tags that they want to display in test
     String[] tags() default {};
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 340889d8765..a1ef4ff2e39 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -64,9 +64,11 @@ public class FeatureCommandTest {
         assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(2)));
         assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\t" +
-                "SupportedMaxVersion: 4.1-IV0\tFinalizedVersionLevel: 
3.3-IV3\t", outputWithoutEpoch(features.get(3)));
+                "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 
3.3-IV3\t", outputWithoutEpoch(features.get(3)));
+        assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+                "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(4)));
         assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
-                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(4)));
+                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(5)));
     }
 
     // Use the first MetadataVersion that supports KIP-919
@@ -86,9 +88,11 @@ public class FeatureCommandTest {
         assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
                 "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(2)));
         assertEquals("Feature: metadata.version\tSupportedMinVersion: 
3.3-IV3\t" +
-                "SupportedMaxVersion: 4.1-IV0\tFinalizedVersionLevel: 
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
+                "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
+        assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
+                "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(4)));
         assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
-                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(4)));
+                "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", 
outputWithoutEpoch(features.get(5)));
     }
 
     @ClusterTest(types = {Type.KRAFT}, metadataVersion = 
MetadataVersion.IBP_3_3_IV3)
@@ -114,7 +118,7 @@ public class FeatureCommandTest {
         );
         // Change expected message to reflect possible MetadataVersion range 
1-N (N increases when adding a new version)
         assertEquals("Could not disable metadata.version. The update failed 
for all features since the following " +
-                "feature had an error: Invalid update version 0 for feature 
metadata.version. Local controller 3000 only supports versions 7-26", 
commandOutput);
+                "feature had an error: Invalid update version 0 for feature 
metadata.version. Local controller 3000 only supports versions 7-28", 
commandOutput);
 
         commandOutput = ToolsTestUtils.captureStandardOut(() ->
                 assertEquals(1, 
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -177,6 +181,7 @@ public class FeatureCommandTest {
                 "group.version was downgraded to 0.\n" +
                 "kraft.version was downgraded to 0.\n" +
                 "metadata.version was downgraded to 18.\n" +
+                "share.version was downgraded to 0.\n" +
                 "transaction.version was downgraded to 0.", commandOutput);
     }
 

Reply via email to