Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan merged PR #15685:
URL: https://github.com/apache/kafka/pull/15685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2138416302

   Test failures are unrelated. Merging  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619346921


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   ```
   val defaultValue = defaultVersionString match {
 case Some(versionString) => 
MetadataVersion.fromVersionString(versionString)
 case None => MetadataVersion.LATEST_PRODUCTION
   }
   
   val releaseVersionTag = Option(namespace.getString("release_version"))
   val featureTag = 
featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME)
   
   (releaseVersionTag, featureTag) match {
 case (Some(_), Some(_)) => // We should throw an error before we hit 
this case, but include for completeness
   throw new IllegalArgumentException("Both --release_version and 
--feature were set. Only one of the two flags can be set.")
 case (Some(version), None) =>
   MetadataVersion.fromVersionString(version)
 case (None, Some(level)) =>
   MetadataVersion.fromFeatureLevel(level)
 case (None, None) =>
   defaultValue
   }
   ```



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619344585


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   We only use the passed in metadata version for defaults if --release-version 
is specified. If version default is specified, we don't use the replication 
configs.



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+   

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619344585


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   We only use the passed in metadata version for defaults if --version-default 
is specified. If version default is specified, we don't use the replication 
configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619332926


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   Thanks for the reply. Got it now. We pass in 
INTER_BROKER_PROTOCOL_VERSION_CONFIG as the default when calling 
`getMetadataVersion`. But that config shouldn't impact the MV used for 
selecting other features. 
   
   ```
 val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
   
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619263353


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   I've updated this to make it clearer, but I think the original code is 
correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619260318


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   Sorry I thought about this more. I don't think this is correct. 
   If we don't specify --release-version we will use latest production. Where 
do you see we use the replication configs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619249547


##
server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java:
##
@@ -14,37 +14,101 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.server.common;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
-import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-class FeaturesTest {
-@Test
-public void testKRaftModeFeatures() {
-Features features = new Features(MINIMUM_KRAFT_VERSION,
-Collections.singletonMap("foo", (short) 2), 123, true);
-assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
-features.finalizedFeatures().get(FEATURE_NAME));
-assertEquals((short) 2,
-features.finalizedFeatures().get("foo"));
-assertEquals(2, features.finalizedFeatures().size());
+public class FeaturesTest {
+
+@ParameterizedTest
+@EnumSource(Features.class)
+public void testFromFeatureLevelAllFeatures(Features feature) {
+FeatureVersion[] featureImplementations = feature.featureVersions();
+int numFeatures = featureImplementations.length;
+for (short i = 1; i < numFeatures; i++) {
+assertEquals(featureImplementations[i - 1], 
feature.fromFeatureLevel(i));
+}
+}
+
+@ParameterizedTest
+@EnumSource(Features.class)
+public void testValidateVersionAllFeatures(Features feature) {
+for (FeatureVersion featureImpl : feature.featureVersions()) {
+// Ensure the minimum bootstrap metadata version is included if no 
metadata version dependency.
+Map deps = new HashMap<>();
+deps.putAll(featureImpl.dependencies());
+if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) {

Review Comment:
   I added this to the test so it has reasonable features passed in. But when 
this is called, we are passing in ALL features including metadata version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619248609


##
server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java:
##
@@ -14,37 +14,101 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.server.common;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
-import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-class FeaturesTest {
-@Test
-public void testKRaftModeFeatures() {
-Features features = new Features(MINIMUM_KRAFT_VERSION,
-Collections.singletonMap("foo", (short) 2), 123, true);
-assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
-features.finalizedFeatures().get(FEATURE_NAME));
-assertEquals((short) 2,
-features.finalizedFeatures().get("foo"));
-assertEquals(2, features.finalizedFeatures().size());
+public class FeaturesTest {
+
+@ParameterizedTest
+@EnumSource(Features.class)
+public void testFromFeatureLevelAllFeatures(Features feature) {
+FeatureVersion[] featureImplementations = feature.featureVersions();
+int numFeatures = featureImplementations.length;
+for (short i = 1; i < numFeatures; i++) {
+assertEquals(featureImplementations[i - 1], 
feature.fromFeatureLevel(i));
+}
+}
+
+@ParameterizedTest
+@EnumSource(Features.class)
+public void testValidateVersionAllFeatures(Features feature) {
+for (FeatureVersion featureImpl : feature.featureVersions()) {
+// Ensure the minimum bootstrap metadata version is included if no 
metadata version dependency.
+Map deps = new HashMap<>();
+deps.putAll(featureImpl.dependencies());
+if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) {

Review Comment:
   No. Features should not require a dependency on MV. I don't think we need to 
add this logic to all places validation is called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619246972


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +114,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {

Review Comment:
   I have to change this because of the other comment you mentioned anyway  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619246506


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   I wouldn't particularly split hairs here since I think both are reasonable. 
However, I can make it consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619224649


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +114,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {

Review Comment:
   usesVersionDefault => releaseVersionSpecified ?



##
server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java:
##
@@ -14,37 +14,101 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.server.common;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
-import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-class FeaturesTest {
-@Test
-public void testKRaftModeFeatures() {
-Features features = new Features(MINIMUM_KRAFT_VERSION,
-Collections.singletonMap("foo", (short) 2), 123, true);
-assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
-features.finalizedFeatures().get(FEATURE_NAME));
-assertEquals((short) 2,
-features.finalizedFeatures().get("foo"));
-assertEquals(2, features.finalizedFeatures().size());
+public class FeaturesTest {
+
+@ParameterizedTest
+@EnumSource(Features.class)
+public void testFromFeatureLevelAllFeatures(Features feature) {
+FeatureVersion[] featureImplementations = feature.featureVersions();
+int numFeatures = featureImplementations.length;
+for (short i = 1; i < numFeatures; i++) {
+assertEquals(featureImplementations[i - 1], 
feature.fromFeatureLevel(i));
+}
+}
+
+@ParameterizedTest
+@EnumSource(Features.class)
+public void testValidateVersionAllFeatures(Features feature) {
+for (FeatureVersion featureImpl : feature.featureVersions()) {
+// Ensure the minimum bootstrap metadata version is included if no 
metadata version dependency.
+Map deps = new HashMap<>();
+deps.putAll(featureImpl.dependencies());
+if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) {

Review Comment:
   Should we require each feature to include a dependency on MV? Otherwise, we 
need to add this logic in all places where `Features.validateVersion` is called.



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619106457


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -16,72 +16,135 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values());
+
+public static final Features[] FEATURES;
+public static final List PRODUCTION_FEATURES;
 
-public final class Features {
-private final MetadataVersion version;
-private final Map finalizedFeatures;
-private final long finalizedFeaturesEpoch;
+public static final List PRODUCTION_FEATURE_NAMES;
+private final String name;
+private final FeatureVersion[] featureVersions;
 
-public static Features fromKRaftVersion(MetadataVersion version) {
-return new Features(version, Collections.emptyMap(), -1, true);
+Features(String name,
+ FeatureVersion[] featureVersions) {
+this.name = name;
+this.featureVersions = featureVersions;
 }
 
-public Features(
-MetadataVersion version,
-Map finalizedFeatures,
-long finalizedFeaturesEpoch,
-boolean kraftMode
-) {
-this.version = version;
-this.finalizedFeatures = new HashMap<>(finalizedFeatures);
-this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-// In KRaft mode, we always include the metadata version in the 
features map.
-// In ZK mode, we never include it.
-if (kraftMode) {
-this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
-} else {
-this.finalizedFeatures.remove(FEATURE_NAME);
-}
+static {
+Features[] enumValues = Features.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.name != 
TEST_VERSION.featureName()).collect(Collectors.toList());

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-29 Thread via GitHub


dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1618450649


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -16,72 +16,135 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values());
+
+public static final Features[] FEATURES;
+public static final List PRODUCTION_FEATURES;
 
-public final class Features {
-private final MetadataVersion version;
-private final Map finalizedFeatures;
-private final long finalizedFeaturesEpoch;
+public static final List PRODUCTION_FEATURE_NAMES;
+private final String name;
+private final FeatureVersion[] featureVersions;
 
-public static Features fromKRaftVersion(MetadataVersion version) {
-return new Features(version, Collections.emptyMap(), -1, true);
+Features(String name,
+ FeatureVersion[] featureVersions) {
+this.name = name;
+this.featureVersions = featureVersions;
 }
 
-public Features(
-MetadataVersion version,
-Map finalizedFeatures,
-long finalizedFeaturesEpoch,
-boolean kraftMode
-) {
-this.version = version;
-this.finalizedFeatures = new HashMap<>(finalizedFeatures);
-this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-// In KRaft mode, we always include the metadata version in the 
features map.
-// In ZK mode, we never include it.
-if (kraftMode) {
-this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
-} else {
-this.finalizedFeatures.remove(FEATURE_NAME);
-}
+static {
+Features[] enumValues = Features.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.name != 
TEST_VERSION.featureName()).collect(Collectors.toList());

Review Comment:
   There is a small bug here. We should use equals, I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-28 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1618018029


##
server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java:
##
@@ -14,37 +14,86 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.server.common;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collections;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
-import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-class FeaturesTest {
-@Test
-public void testKRaftModeFeatures() {
-Features features = new Features(MINIMUM_KRAFT_VERSION,
-Collections.singletonMap("foo", (short) 2), 123, true);
-assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
-features.finalizedFeatures().get(FEATURE_NAME));
-assertEquals((short) 2,
-features.finalizedFeatures().get("foo"));
-assertEquals(2, features.finalizedFeatures().size());
+public class FeaturesTest {
+
+@ParameterizedTest
+@EnumSource(Features.class)
+public void testFromFeatureLevelAllFeatures(Features feature) {
+FeatureVersion[] featureImplementations = feature.featureVersions();
+int numFeatures = featureImplementations.length;
+for (short i = 1; i < numFeatures; i++) {
+assertEquals(featureImplementations[i - 1], 
feature.fromFeatureLevel(i));
+}
+}
+
+@ParameterizedTest
+@EnumSource(Features.class)
+public void testValidateVersionAllFeatures(Features feature) {
+for (FeatureVersion featureImpl : feature.featureVersions()) {
+// Ensure that the feature is valid given the typical 
metadataVersionMapping and the dependencies.
+// Note: Other metadata versions are valid, but this one should 
always be valid.
+Features.validateVersion(featureImpl, 
featureImpl.bootstrapMetadataVersion(), featureImpl.dependencies());
+}
 }
 
 @Test
-public void testZkModeFeatures() {
-Features features = new Features(MINIMUM_KRAFT_VERSION,
-Collections.singletonMap("foo", (short) 2), 123, false);
-assertNull(features.finalizedFeatures().get(FEATURE_NAME));
-assertEquals((short) 2,
-features.finalizedFeatures().get("foo"));
-assertEquals(1, features.finalizedFeatures().size());
+public void testInvalidValidateVersion() {
+// Using too low of a MetadataVersion is invalid
+assertThrows(IllegalArgumentException.class,
+() -> Features.validateVersion(
+TestFeatureVersion.TEST_1,
+MetadataVersion.IBP_2_8_IV0,
+Collections.emptyMap()
+)
+);
+
+// Using a version that is lower than the dependency will fail.
+assertThrows(IllegalArgumentException.class,
+ () -> Features.validateVersion(
+ TestFeatureVersion.TEST_2,
+ MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,

Review Comment:
   I thought it would be easier, but I see there is room for mistakes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-28 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1618017128


##
server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+
+public final class FinalizedFeatures {
+private final MetadataVersion metadataVersion;
+private final Map finalizedFeatures;
+private final long finalizedFeaturesEpoch;
+
+public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
+return new FinalizedFeatures(version, Collections.emptyMap(), -1, 
true);
+}
+
+public FinalizedFeatures(
+MetadataVersion metadataVersion,
+Map finalizedFeatures,
+long finalizedFeaturesEpoch,
+boolean kraftMode
+) {
+this.metadataVersion = metadataVersion;
+this.finalizedFeatures = new HashMap<>(finalizedFeatures);
+this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+// In KRaft mode, we always include the metadata version in the 
features map.
+// In ZK mode, we never include it.
+if (kraftMode) {
+this.finalizedFeatures.put(FEATURE_NAME, 
metadataVersion.featureLevel());

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-28 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1618017282


##
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##
@@ -61,6 +62,12 @@ public static Map 
defaultFeatureMap(boolean enableUnstable
 enableUnstable ?
 MetadataVersion.latestTesting().featureLevel() :
 MetadataVersion.latestProduction().featureLevel()));
+for (Features feature : Features.PRODUCTION_FEATURES) {
+features.put(feature.featureName(), VersionRange.of(
+0,
+feature.latestProduction()

Review Comment:
   This is in the followup. :) It changes a ton of files so I would prefer to 
do it separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-28 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1618016824


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -16,72 +16,134 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values());
+
+public static final Features[] FEATURES;
+public static final List PRODUCTION_FEATURES;
 
-public final class Features {
-private final MetadataVersion version;
-private final Map finalizedFeatures;
-private final long finalizedFeaturesEpoch;
+public static final List PRODUCTION_FEATURE_NAMES;
+private final String name;
+private final FeatureVersion[] featureVersions;
 
-public static Features fromKRaftVersion(MetadataVersion version) {
-return new Features(version, Collections.emptyMap(), -1, true);
+Features(String name,
+ FeatureVersion[] featureVersions) {
+this.name = name;
+this.featureVersions = featureVersions;
 }
 
-public Features(
-MetadataVersion version,
-Map finalizedFeatures,
-long finalizedFeaturesEpoch,
-boolean kraftMode
-) {
-this.version = version;
-this.finalizedFeatures = new HashMap<>(finalizedFeatures);
-this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-// In KRaft mode, we always include the metadata version in the 
features map.
-// In ZK mode, we never include it.
-if (kraftMode) {
-this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
-} else {
-this.finalizedFeatures.remove(FEATURE_NAME);
-}
+static {
+Features[] enumValues = Features.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.name != 
TEST_VERSION.featureName()).collect(Collectors.toList());
+PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+feature.name).collect(Collectors.toList());
 }
 
-public MetadataVersion metadataVersion() {
-return version;
+public String featureName() {
+return name;
 }
 
-public Map finalizedFeatures() {
-return finalizedFeatures;
+public FeatureVersion[] featureVersions() {
+return featureVersions;
 }
 
-public long finalizedFeaturesEpoch() {
-return finalizedFeaturesEpoch;
+public short latestProduction() {
+return defaultValue(MetadataVersion.LATEST_PRODUCTION);
 }
 
-@Override
-public boolean equals(Object o) {
-if (o == null || !(o.getClass().equals(Features.class))) return false;
-Features other = (Features) o;
-return version == other.version &&
-finalizedFeatures.equals(other.finalizedFeatures) &&
-finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
+/**
+ * Creates a FeatureVersion from a level.
+ *
+ * @param level   the level of the feature
+ * @return   the FeatureVersionUtils.FeatureVersion for the feature 
the enum is based on.
+ * @throwsIllegalArgumentException if the feature is not known.
+ */
+public FeatureVersion fromFeatureLevel(short level) {
+return Arrays.stream(featureVersions).filter(featureVersion ->
+featureVersion.featureLevel() == level).findFirst().orElseThrow(
+() -> new IllegalArgumentException("No feature:" + 
featureName() + " with feature level " + level));
 }
 
-@Override
-public int 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-28 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1618016483


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   If we use feature flags to specify MV, we should not default based on the MV 
but instead use latest default for non-specified features.
   
   The difference is specifying MV using `--version-default` vs `feature`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-28 Thread via GitHub


junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1617946240


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+!Option(namespace.getString("release_version")).isEmpty

Review Comment:
   I am wondering why we need to pass in `usesVersionDefault`? Earlier in 
`getMetadataVersion`, we already resolve `metadataVersion` to 
`LATEST_PRODUCTION` if it's not explicitly specified.



##
server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+
+public final class FinalizedFeatures {
+private final MetadataVersion metadataVersion;
+private final Map finalizedFeatures;
+private final long finalizedFeaturesEpoch;
+
+public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
+return new FinalizedFeatures(version, Collections.emptyMap(), -1, 
true);
+}
+
+public FinalizedFeatures(
+MetadataVersion metadataVersion,
+Map finalizedFeatures,
+long finalizedFeaturesEpoch,
+boolean kraftMode
+) {
+this.metadataVersion = metadataVersion;
+this.finalizedFeatures = new HashMap<>(finalizedFeatures);
+this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+// In KRaft mode, we always include the metadata version in the 
features map.
+// In ZK mode, we never include it.
+if (kraftMode) {
+this.finalizedFeatures.put(FEATURE_NAME, 
metadataVersion.featureLevel());
+} else {
+this.finalizedFeatures.remove(FEATURE_NAME);
+}
+}
+
+public MetadataVersion 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-28 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2135851154

   Yes @dajac! The code for unstable versions is mostly ready but I need to 
refactor based on some changes here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1613992588


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release_version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+Option(namespace.getString("release_version")).isEmpty

Review Comment:
   I should probably add a test for this 臘‍♀️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1613922117


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
 case "format" =>
   val directories = configToLogDirectories(config.get)
   val clusterId = namespace.getString("cluster_id")
-  val metadataVersion = getMetadataVersion(namespace,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  if (!metadataVersion.isKRaftSupported) {
-throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-  }
-  if (!metadataVersion.isProduction) {
-if (config.get.unstableMetadataVersionsEnabled) {
-  System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-} else {
-  throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-}
-  }
   val metaProperties = new MetaProperties.Builder().
 setVersion(MetaPropertiesVersion.V1).
 setClusterId(clusterId).
 setNodeId(config.get.nodeId).
 build()
   val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+  if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+throw new TerseFailure("Both --release_version and --feature were 
set. Only one of the two flags can be set.")
+  }
+  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+  validateMetadataVersion(metadataVersion, config)
+  // Get all other features, validate, and create records for them
+  generateFeatureRecords(
+metadataRecords,
+metadataVersion,
+featureNamesAndLevelsMap,
+Features.PRODUCTION_FEATURES.asScala.toList,
+Option(namespace.getString("release_version")).isEmpty

Review Comment:
   Should it be `!isEmpty`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1613761681


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   Yup. I will push the code soon, but tried to write a clear comment about 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1613728690


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   > Originally this did not need to be a production ready MV even if the 
feature is production, but I think we are now flipping this around and saying 
the feature is production ready iff the MV is production ready.
   
   If there is a 1-to-1 mapping from feature to MV, reasoning about production 
readiness in one place seems simpler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1613656338


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -21,16 +21,17 @@
 
 public enum TestFeatureVersion implements FeatureVersion {
 
-TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),
+// TEST_1 released right before MV 3.7-IVO was released, and it has no 
dependencies
 TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+// TEST_2 released right before MV 3.8-IVO was released, and it depends on 
this metadata version
 TEST_2(2, MetadataVersion.IBP_3_8_IV0, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_8_IV0.featureLevel()));
 
-private short featureLevel;
-private MetadataVersion metadataVersionMapping;
-private Map dependencies;
+private final short featureLevel;
+private final MetadataVersion metadataVersionMapping;
+private final Map dependencies;
 
 public static final String FEATURE_NAME = "test.feature.version";
-public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+public static final TestFeatureVersion LATEST_PRODUCTION = TEST_1;

Review Comment:
   臘‍♀️ I realized I fixed this in the next PR. I will fix this here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1613639487


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   Ok -- so if I understand correctly, the request is to remove latest 
production per feature and to simply mark as production ready if the MV that 
corresponds to it is production ready?
   
   The only case where this is tricky is when we use the --feature flag and we 
need to find the latest production version. To do that, we will need to get the 
latest production Metadata and map that to a feature. It's doable though, so I 
can proceed with that.
   
   As for
   >If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes.
   
   Originally this did not need to be a production ready MV even if the feature 
is production, but I think we are now flipping this around and saying the 
feature if production ready iff the MV is production ready.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1613639487


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   Ok -- so if I understand correctly, the request is to remove latest 
production per feature and to simply mark as production ready if the MV that 
corresponds to it is production ready?
   
   The only case where this is tricky is when we use the --feature flag and we 
need to find the latest production version. To do that, we will need to get the 
latest production Metadata and map that to a feature. It's doable though, so I 
can proceed with that.
   
   As for
   >If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes.
   
   Originally this did not need to be a production ready MV even if the feature 
is production, but I think we are now flipping this around and saying the 
feature is production ready iff the MV is production ready.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1613634445


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -21,16 +21,17 @@
 
 public enum TestFeatureVersion implements FeatureVersion {
 
-TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),
+// TEST_1 released right before MV 3.7-IVO was released, and it has no 
dependencies
 TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+// TEST_2 released right before MV 3.8-IVO was released, and it depends on 
this metadata version
 TEST_2(2, MetadataVersion.IBP_3_8_IV0, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_8_IV0.featureLevel()));
 
-private short featureLevel;
-private MetadataVersion metadataVersionMapping;
-private Map dependencies;
+private final short featureLevel;
+private final MetadataVersion metadataVersionMapping;
+private final Map dependencies;
 
 public static final String FEATURE_NAME = "test.feature.version";
-public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+public static final TestFeatureVersion LATEST_PRODUCTION = TEST_1;

Review Comment:
   It does not since in Features, I don't define a production version. I set 
this for testing purposes. But Features.PRODUCTION_VERSIONS does not contain 
this feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612454534


##
server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+
+public final class FinalizedFeatures {
+private final MetadataVersion metadataVersion;
+private final Map finalizedFeatures;
+private final long finalizedFeaturesEpoch;
+
+public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
+return new FinalizedFeatures(version, Collections.emptyMap(), -1, 
true);
+}
+
+public FinalizedFeatures(
+MetadataVersion metadataVersion,
+Map finalizedFeatures,
+long finalizedFeaturesEpoch,
+boolean kraftMode
+) {
+this.metadataVersion = metadataVersion;
+this.finalizedFeatures = new HashMap<>(finalizedFeatures);
+this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+// In KRaft mode, we always include the metadata version in the 
features map.
+// In ZK mode, we never include it.
+if (kraftMode) {
+this.finalizedFeatures.put(FEATURE_NAME, 
metadataVersion.featureLevel());
+} else {
+this.finalizedFeatures.remove(FEATURE_NAME);
+}
+}
+
+public MetadataVersion metadataVersion() {
+return metadataVersion;
+}
+
+public Map finalizedFeatures() {
+return finalizedFeatures;
+}
+
+public long finalizedFeaturesEpoch() {
+return finalizedFeaturesEpoch;
+}
+
+@Override
+public boolean equals(Object o) {
+if (o == null || !(o.getClass().equals(FinalizedFeatures.class))) 
return false;
+FinalizedFeatures other = (FinalizedFeatures) o;
+return metadataVersion == other.metadataVersion &&
+finalizedFeatures.equals(other.finalizedFeatures) &&
+finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(metadataVersion, finalizedFeatures, 
finalizedFeaturesEpoch);
+}
+
+@Override
+public String toString() {
+return "Features" +
+"(version=" + metadataVersion +

Review Comment:
   version => metadataVersion ?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   > If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes.
   
   I thought that's the model being 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-24 Thread via GitHub


artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612863802


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -21,16 +21,17 @@
 
 public enum TestFeatureVersion implements FeatureVersion {
 
-TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),
+// TEST_1 released right before MV 3.7-IVO was released, and it has no 
dependencies
 TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+// TEST_2 released right before MV 3.8-IVO was released, and it depends on 
this metadata version
 TEST_2(2, MetadataVersion.IBP_3_8_IV0, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_8_IV0.featureLevel()));
 
-private short featureLevel;
-private MetadataVersion metadataVersionMapping;
-private Map dependencies;
+private final short featureLevel;
+private final MetadataVersion metadataVersionMapping;
+private final Map dependencies;
 
 public static final String FEATURE_NAME = "test.feature.version";
-public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+public static final TestFeatureVersion LATEST_PRODUCTION = TEST_1;

Review Comment:
   Does it mean that the broker will show the test feature to the clients?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   I think it would be confusing if a user specifies the latest MV version and 
the result would be different from when nothing is specified (implied 
assumption is that nothing is shortcut for latest MV known to the tool).  It 
would also be confusing if by default (nothing is specified) we don't have all 
features set to the latest versions known to the tool.  We can provide guidance 
to new features developers in the comments (and the test feature example) and 
add a unit test that enforces the equivalence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612404062


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   Ok sorry to be a little all over the place here. I think we should have 
semantics where on each new feature version released to production, we have a 
MV released to production. 
   
   In this case, whether we specify latest production (as it did before) or use 
an empty optional to specify the latest production feature SHOULD be 
equivalent. The only case it is not is if someone improperly doesn't set the 
metadataMapping correctly. There isn't a great way to enforce that (I can do so 
via a test), but I guess the question is whether we prefer the empty approach 
that ensures the latest features are provided OR if we prefer the latest 
production metadata approach that is simpler but may risk not picking up 
features if folks implement the method incorrectly/don't update the MV.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612321070


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   If we want to change it back, I will also update the comments in the 
metadataVersionMapping method as it will not be correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612319940


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   We do not. This was changed yesterday and the code was as you say in the 
comment. 
   
   > Do we get the same result by passing in that metadataVersion to 
feature.defaultValue()
   
   If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes. If we want to codify this and require a new MV (used only for mapping a 
default) for every new feature to be created when we mark the feature as 
production ready, I can switch it back to how it was yesterday. 
   
   I originally changed it in the case where we want to piggyback on the next 
MV and we may mark the feature as production ready but not the MV.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612319940


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   We do not. This was changed yesterday and the code was as you say in the 
comment. Here is the behavior as it stands now:
   
   > Do we get the same result by passing in that metadataVersion to 
feature.defaultValue()
   
   If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes. If we want to codify this and require a new MV (used only for mapping a 
default) for every new feature to be created when we mark the feature as 
production ready, I can switch it back to how it was yesterday. 
   
   
   



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   We do not. This was changed yesterday and the code was as you say in the 
comment. 
   
   > Do we get the same result by passing in that metadataVersion to 
feature.defaultValue()
   
   If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes. If we want to codify this and require a new MV (used only for mapping a 
default) for every new feature to be created when we mark the feature as 
production ready, I can switch it back to how it was yesterday. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314478


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -141,6 +189,9 @@ object StorageTool extends Logging {
 formatParser.addArgument("--release-version", "-r").
   action(store()).
   help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION}")
+formatParser.addArgument("--feature").
+  help("A feature upgrade we should perform, in feature=level format. For 
example: `metadata.version=5`.").

Review Comment:
   We can do so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314269


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +105,51 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features]): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.

Review Comment:
   This was a typo. I will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314113


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -156,16 +200,27 @@ object StorageTool extends Logging {
 
   def getMetadataVersion(
 namespace: Namespace,
+featureNamesAndLevelsMap: Map[String, java.lang.Short],
 defaultVersionString: Option[String]
   ): MetadataVersion = {
 val defaultValue = defaultVersionString match {
   case Some(versionString) => 
MetadataVersion.fromVersionString(versionString)
   case None => MetadataVersion.LATEST_PRODUCTION
 }
 
-Option(namespace.getString("release_version"))
-  .map(ver => MetadataVersion.fromVersionString(ver))
-  .getOrElse(defaultValue)
+val releaseVersionTag = Option(namespace.getString("release_version"))
+val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME)
+
+(releaseVersionTag, featureTag) match {
+  case (Some(_), Some(_)) =>
+throw new IllegalArgumentException("Both --release_version and 
--feature were set. Only one of the two flags can be set.")

Review Comment:
   Right. I can update this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612313415


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +76,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {
-Features.supportedFeatures(
-  java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+val features = new util.HashMap[String, SupportedVersionRange]()
+  features.put(MetadataVersion.FEATURE_NAME,
 new SupportedVersionRange(
   MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
   if (unstableMetadataVersionsEnabled) {
 MetadataVersion.latestTesting.featureLevel
   } else {
 MetadataVersion.latestProduction.featureLevel
-  }
-)))
+  }))
+  FeatureVersion.PRODUCTION_FEATURES.forEach { feature =>

Review Comment:
   This will be in a followup. I'm working on it in the background. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1602305075


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements 
FeatureVersionUtils.FeatureVersionImpl {
+
+TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),
+TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap());
+
+private short featureLevel;
+private MetadataVersion metadataVersionMapping;
+private Map dependencies;

Review Comment:
   Should we make those instance vals final?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -156,16 +200,27 @@ object StorageTool extends Logging {
 
   def getMetadataVersion(
 namespace: Namespace,
+featureNamesAndLevelsMap: Map[String, java.lang.Short],
 defaultVersionString: Option[String]
   ): MetadataVersion = {
 val defaultValue = defaultVersionString match {
   case Some(versionString) => 
MetadataVersion.fromVersionString(versionString)
   case None => MetadataVersion.LATEST_PRODUCTION
 }
 
-Option(namespace.getString("release_version"))
-  .map(ver => MetadataVersion.fromVersionString(ver))
-  .getOrElse(defaultValue)
+val releaseVersionTag = Option(namespace.getString("release_version"))
+val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME)
+
+(releaseVersionTag, featureTag) match {
+  case (Some(_), Some(_)) =>
+throw new IllegalArgumentException("Both --release_version and 
--feature were set. Only one of the two flags can be set.")

Review Comment:
   We should disallow the case where both --release_version and --feature are 
used, but --feature doesn't include metadata, right?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -141,6 +189,9 @@ object StorageTool extends Logging {
 formatParser.addArgument("--release-version", "-r").
   action(store()).
   help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION}")
+formatParser.addArgument("--feature").
+  help("A feature upgrade we should perform, in feature=level format. For 
example: `metadata.version=5`.").

Review Comment:
   Should we add a shorthand for --feature like other options?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +105,51 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features]): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.

Review Comment:
   Hmm, does the storage tool support the --version-default option?



##
server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1611981645


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   Sorry I think I'm not explaining this clearly. Once it is released, this 
value shouldn't change. I wish KIP-1014 was completed so this could be clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1611979308


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),

Review Comment:
   I've made this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1611842086


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   Understood. The downside of this is that you have to ensure that you change 
it when the next unstable one is released and the new feature is not ready to 
be promoted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1611825497


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   My thought process is that we will just attach it to the current next 
unstable one. If there is no such MV we create it and if there is still no 
feature atttached to it, we will just release it at the release. I probably 
need to write up a formal doc on how this will work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1611615148


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   > How do we know the release version when we create the feature.
   
   My understanding is that the new version of the feature will start as an 
unstable one. When we promote it to production ready, we can attach it to the 
correct MV (the latest one available in the release, I suppose).



##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +75,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {
-Features.supportedFeatures(
-  java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+val features = new util.HashMap[String, SupportedVersionRange]()
+  features.put(MetadataVersion.FEATURE_NAME,
 new SupportedVersionRange(
   MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
   if (unstableMetadataVersionsEnabled) {
 MetadataVersion.latestTesting.featureLevel
   } else {
 MetadataVersion.latestProduction.featureLevel
-  }
-)))
+  }))
+org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach { 
feature =>

Review Comment:
   Could we import 
`org.apache.kafka.server.common.Features.PRODUCTION_FEATURES` as we only use 
`PRODUCTION_FEATURES` in the end?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()
+
+val allNonZeroFeaturesAndLevels: ArrayBuffer[FeatureVersion]  = 
mutable.ArrayBuffer[FeatureVersion]()

Review Comment:
   nit: There is an extra space.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org


Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610507371


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),

Review Comment:
   I tried this, but the logic for defaults breaks when we don't have a 0 
version. 
   ```
   val allFeaturesAndLevels: List[FeatureVersion] = allFeatures.map { 
feature =>
 val level: java.lang.Short = 
specifiedFeatures.getOrElse(feature.featureName, 
feature.defaultValue(metadataVersionForDefault))
 feature.fromFeatureLevel(level)
   }
   ```
   Here, I suppose we could do some work to filter out 0s and assume on the 
rest of the code that a missing value in the map = 0. If that is preferred I 
can do that instead, but I also don't think the 0 version necessarily hurts 
anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610472137


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -64,14 +63,16 @@ public enum Features {
 
 PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
 feature.usedInProduction).collect(Collectors.toList());
+PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+feature.name).collect(Collectors.toList());
 }
 
 public String featureName() {
 return name;
 }
 
 public FeatureVersion[] features() {

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610471028


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   How do we know the release version when we create the feature. The release 
version can change, so should we change it as each new MV is added? I suppose 
you mean the latest MV when the feature is released.
   
   This is also what I did but adding 1 to every MV for the reasons I explained 
[here](https://github.com/apache/kafka/pull/15685#discussion_r1610248735)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610471028


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   How do we know the release version when we create the feature. The release 
version can change, so should we change it as each new MV is added?
   
   This is also what I did but adding 1 to every MV for the reasons I explained 
[here](https://github.com/apache/kafka/pull/15685#discussion_r1610248735)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610437612


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),

Review Comment:
   Version 0 == "feature doesn't exist", does it need to be explicitly codified 
the enum?



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -470,8 +471,8 @@ BrokerFeature processRegistrationFeature(
 // A feature is not found in the finalizedFeature map if it is unknown 
to the controller or set to 0 (feature not enabled).
 // As more features roll out, it may be common to leave a feature 
disabled, so this log is debug level in the case

Review Comment:
   Comment seems to be outdated w.r.t. latest logic?



##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Map;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The minimum metadata version which sets this feature version as 
default. When bootstrapping using only
+ * a metadata version, a reasonable default for all other features is 
chosen based on this value.
+ * This should be defined as the next metadata version to be released when 
the feature version becomes production ready.
+ * (Ie, if the current production MV is 17 when a feature version is 
released, its mapping should be to MV 18)

Review Comment:
   Can we make it map to the release version?  If not, can we add a comment 
explaining the logic?
   
   Intuitively, if we have a feature like
   - FOO(1, 42) // released version 1 when MV got bumped to 42 
   - FOO(2, 77) // released version 2 when MV got bumped to 77
   - FOO(3, 77) // released version 3 at the same time as 2
   
   we should be able to discover that if we got MV 45, then FOO=1 because 1 was 
available at 42 and 2&3 are not available yet.  If we got 78 then it should be 
3 because it was max available at 77.
   
   If we got MV 30 then we should get 0 (implicit) because it wasn't available 
yet.
   
   Using the same table, we can determine that if a feature version 1 was 
selected with MV 41, then it's invalid combination; similarly if feature 2 or 3 
was selected with MV 45 it's invalid combination.  And we can select 1, 2 or 3 
when MV is 77.



##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -64,14 +63,16 @@ public enum Features {
 
 PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
 feature.usedInProduction).collect(Collectors.toList());
+PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+feature.name).collect(Collectors.toList());
 }
 
 public String featureName() {
 return name;
 }

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610251718


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -16,72 +16,135 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
 
-public final class Features {
-private final MetadataVersion version;
-private final Map finalizedFeatures;
-private final long finalizedFeaturesEpoch;
+public static final Features[] FEATURES;
+public static final List PRODUCTION_FEATURES;
+private final String name;
+private final FeatureVersion[] features;
+private final CreateMethod createFeatureVersionMethod;
+private final boolean usedInProduction;
 
-public static Features fromKRaftVersion(MetadataVersion version) {
-return new Features(version, Collections.emptyMap(), -1, true);
+Features(String name,
+ FeatureVersion[] features,
+ CreateMethod createMethod,
+ boolean usedInProduction) {
+this.name = name;
+this.features = features;
+this.createFeatureVersionMethod = createMethod;
+this.usedInProduction = usedInProduction;
 }
 
-public Features(
-MetadataVersion version,
-Map finalizedFeatures,
-long finalizedFeaturesEpoch,
-boolean kraftMode
-) {
-this.version = version;
-this.finalizedFeatures = new HashMap<>(finalizedFeatures);
-this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-// In KRaft mode, we always include the metadata version in the 
features map.
-// In ZK mode, we never include it.
-if (kraftMode) {
-this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
-} else {
-this.finalizedFeatures.remove(FEATURE_NAME);
-}
+static {
+Features[] enumValues = Features.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.usedInProduction).collect(Collectors.toList());
 }
 
-public MetadataVersion metadataVersion() {
-return version;
+public String featureName() {
+return name;
 }
 
-public Map finalizedFeatures() {
-return finalizedFeatures;
+public FeatureVersion[] features() {
+return features;
 }
 
-public long finalizedFeaturesEpoch() {
-return finalizedFeaturesEpoch;
+/**
+ * Creates a FeatureVersion from a given name and level with the correct 
feature object underneath.
+ *
+ * @param level   the level of the feature
+ * @returns   the FeatureVersionUtils.FeatureVersion for the feature 
the enum is based on.
+ * @throwsIllegalArgumentException if the feature name is not 
valid (not implemented for this method)
+ */
+public FeatureVersion fromFeatureLevel(short level) {
+return createFeatureVersionMethod.fromFeatureLevel(level);

Review Comment:
   Iterate through all the features and all the levels of each feature? We 
could I suppose, but this could get slow if we have a lot of different features 
and levels. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610250283


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -16,72 +16,135 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
 
-public final class Features {
-private final MetadataVersion version;
-private final Map finalizedFeatures;
-private final long finalizedFeaturesEpoch;
+public static final Features[] FEATURES;
+public static final List PRODUCTION_FEATURES;
+private final String name;
+private final FeatureVersion[] features;
+private final CreateMethod createFeatureVersionMethod;
+private final boolean usedInProduction;
 
-public static Features fromKRaftVersion(MetadataVersion version) {
-return new Features(version, Collections.emptyMap(), -1, true);
+Features(String name,
+ FeatureVersion[] features,
+ CreateMethod createMethod,
+ boolean usedInProduction) {
+this.name = name;
+this.features = features;
+this.createFeatureVersionMethod = createMethod;
+this.usedInProduction = usedInProduction;
 }
 
-public Features(
-MetadataVersion version,
-Map finalizedFeatures,
-long finalizedFeaturesEpoch,
-boolean kraftMode
-) {
-this.version = version;
-this.finalizedFeatures = new HashMap<>(finalizedFeatures);
-this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-// In KRaft mode, we always include the metadata version in the 
features map.
-// In ZK mode, we never include it.
-if (kraftMode) {
-this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
-} else {
-this.finalizedFeatures.remove(FEATURE_NAME);
-}
+static {
+Features[] enumValues = Features.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.usedInProduction).collect(Collectors.toList());
 }
 
-public MetadataVersion metadataVersion() {
-return version;
+public String featureName() {
+return name;
 }
 
-public Map finalizedFeatures() {
-return finalizedFeatures;
+public FeatureVersion[] features() {
+return features;
 }
 
-public long finalizedFeaturesEpoch() {
-return finalizedFeaturesEpoch;
+/**
+ * Creates a FeatureVersion from a given name and level with the correct 
feature object underneath.
+ *
+ * @param level   the level of the feature
+ * @returns   the FeatureVersionUtils.FeatureVersion for the feature 
the enum is based on.
+ * @throwsIllegalArgumentException if the feature name is not 
valid (not implemented for this method)
+ */
+public FeatureVersion fromFeatureLevel(short level) {
+return createFeatureVersionMethod.fromFeatureLevel(level);
 }
 
-@Override
-public boolean equals(Object o) {
-if (o == null || !(o.getClass().equals(Features.class))) return false;
-Features other = (Features) o;
-return version == other.version &&
-finalizedFeatures.equals(other.finalizedFeatures) &&
-finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
+/**
+ * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+ * captured in {@link 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610249426


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +75,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {
-Features.supportedFeatures(
-  java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+val features = new util.HashMap[String, SupportedVersionRange]()
+  features.put(MetadataVersion.FEATURE_NAME,
 new SupportedVersionRange(
   MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
   if (unstableMetadataVersionsEnabled) {
 MetadataVersion.latestTesting.featureLevel
   } else {
 MetadataVersion.latestProduction.featureLevel
-  }
-)))
+  }))
+org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach { 
feature =>

Review Comment:
   There are two imports for Features in this file from different modules. Java 
doesn't allow aliasing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1610248735


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, its 
mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();

Review Comment:
   Hmm, there are a lot of components that are all getting mixed around here. 
   
   1. All features require IBP-3.3.0-IV0. This is because this is the minimum 
MV to bootstrap and write a feature record :)  This is encoded in 
Features#validateVersion and doesn't need to be specified for all features 
since it will be required by all.
   2. Some features may require a specific other feature version in order to be 
set. None of the features we proposed (transaction/group coordinator) need this 
but it was requested in the KIP, so I have implemented a framework to do so.
   3. We need to set a reasonable default feature for when folks bootstrap 
using only metadata. There are a few options here. One is to set all features 
to 0. We ended up deciding to take the latest features as default. There is a 
small wrinkle here in that for a given MV, we may introduce a feature version 
after the code that releases the MV. If folks are running off trunk, they will 
have different features for the same MV if we choose the current MV, and that's 
why I suggest the next one. 
   
   This bootstrap method is only used for 3. 1 and 2 are covered by the 
validateVersion method and by enumerating non 3.3 dependencies when defining 
the featureVersion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1609917313


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, its 
mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();

Review Comment:
   > I would like the mapping to remain consistent for MV. If we do the current 
mapping, some versions/images with MV X will have feature version Y and some 
will not (since it wasn't created when MV X was first released)
   
   Isn't it something that will happen anyway? For instance, I will add 
`group.version=1` soon and it will require an old MV. Likely the oldest one 
supported by kraft.
   
   I am not too opinionated on this one though. If you believe that this is the 
right approach. I trust you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-22 Thread via GitHub


dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1609917313


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, its 
mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();

Review Comment:
   > I would like the mapping to remain consistent for MV. If we do the current 
mapping, some versions/images with MV X will have feature version Y and some 
will not (since it wasn't created when MV X was first released)
   
   Isn't it something that will happen anyway? For instance, I will add 
`group.version=1` soon and it will require an old MV. Likely the oldest one 
supported by kraft.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-21 Thread via GitHub


artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1609158284


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +75,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {
-Features.supportedFeatures(
-  java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+val features = new util.HashMap[String, SupportedVersionRange]()
+  features.put(MetadataVersion.FEATURE_NAME,
 new SupportedVersionRange(
   MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
   if (unstableMetadataVersionsEnabled) {
 MetadataVersion.latestTesting.featureLevel
   } else {
 MetadataVersion.latestProduction.featureLevel
-  }
-)))
+  }))
+org.apache.kafka.server.common.Features.PRODUCTION_FEATURES.forEach { 
feature =>

Review Comment:
   Any reasons this is a fully qualified class name and not just imported as 
it's usually done?



##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -16,72 +16,135 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
 
-public final class Features {
-private final MetadataVersion version;
-private final Map finalizedFeatures;
-private final long finalizedFeaturesEpoch;
+public static final Features[] FEATURES;
+public static final List PRODUCTION_FEATURES;
+private final String name;
+private final FeatureVersion[] features;
+private final CreateMethod createFeatureVersionMethod;
+private final boolean usedInProduction;
 
-public static Features fromKRaftVersion(MetadataVersion version) {
-return new Features(version, Collections.emptyMap(), -1, true);
+Features(String name,
+ FeatureVersion[] features,
+ CreateMethod createMethod,
+ boolean usedInProduction) {
+this.name = name;
+this.features = features;
+this.createFeatureVersionMethod = createMethod;
+this.usedInProduction = usedInProduction;
 }
 
-public Features(
-MetadataVersion version,
-Map finalizedFeatures,
-long finalizedFeaturesEpoch,
-boolean kraftMode
-) {
-this.version = version;
-this.finalizedFeatures = new HashMap<>(finalizedFeatures);
-this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-// In KRaft mode, we always include the metadata version in the 
features map.
-// In ZK mode, we never include it.
-if (kraftMode) {
-this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
-} else {
-this.finalizedFeatures.remove(FEATURE_NAME);
-}
+static {
+Features[] enumValues = Features.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.usedInProduction).collect(Collectors.toList());
 }
 
-public MetadataVersion metadataVersion() {
-return version;
+public String featureName() {
+return name;
 }
 
-public Map finalizedFeatures() {
-return finalizedFeatures;
+public FeatureVersion[] features() {
+return features;
 }
 
-public long finalizedFeaturesEpoch() {
-return finalizedFeaturesEpoch;
+/**
+ * Creates a FeatureVersion 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-21 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1608567147


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, its 
mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();

Review Comment:
   I would like the mapping to remain consistent for MV. If we do the current 
mapping, some versions/images with MV X will have feature version Y and some 
will not (since it wasn't created when MV X was first released)
   
   Sorry I forgot to change the name, I will do that today  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-21 Thread via GitHub


dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1608208322


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, its 
mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();

Review Comment:
   I am still confused by this one. Based on our offline discussion, my 
understanding is that this is only used during bootstrapping. We should try to 
make this clear in the name and in the javadoc.
   
   > (Ie, if the current production MV is 17 when a feature is released, its 
mapping should be to MV 18)
   
   For my understanding, why do we require to be the next one? Requiring the 
current seems more natural but I may be missing something.



##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersion {
+
+/**
+ * The level of the feature. 0 means the feature is disabled.
+ */
+short featureLevel();
+
+/**
+ * The name of the feature.
+ */
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, its 
mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();
+
+/**
+ * A mapping from feature to level for all features that this feature 
depends on. If this feature doesn't
+ * depend on any others, return an empty map.
+ * For example, say feature X level x relies on feature Y level y:
+ * feature (X level x).dependencies() will return (Y -> y)
+ */
+Map dependencies();
+
+/**
+ * Utility method to map a list of FeatureVersion to a map of feature name 
to feature level
+ */
+static Map featureImplsToMap(List features) 
{
+return 
features.stream().collect(Collectors.toMap(FeatureVersion::featureName, 
FeatureVersion::featureLevel));
+}

Review Comment:
   This one feels a bit weird in this interface. Should we move it to 
`Features`?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +105,49 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1604173884


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {

Review Comment:
   Hmmm. This was partially due to the evolution of the api. Originally all the 
interfaces here were methods that were used as parameters for the 
FeatureVersion enums. I can try to reorganize this, but I may need some time to 
think about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1604168230


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +76,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {

Review Comment:
   I can look into adding something. The problem is I don't add any new 
production features in this change. I would add one in the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1604168230


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +76,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {

Review Comment:
   I can look into adding something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603893834


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+interface FeatureVersionImpl {
+short featureLevel();

Review Comment:
   I had a javadoc that said "the level of the feaure" but didn't know if that 
was silly. I can add something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603875839


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -469,4 +522,28 @@ object StorageTool extends Logging {
 }
 0
   }
+
+  private def parseNameAndLevel(input: String): Array[String] = {
+val equalsIndex = input.indexOf("=")
+if (equalsIndex < 0)
+  throw new RuntimeException("Can't parse feature=level string " + input + 
": equals sign not found.")
+val name = input.substring(0, equalsIndex).trim
+val levelString = input.substring(equalsIndex + 1).trim
+try {
+  levelString.toShort
+} catch {
+  case _: Throwable =>
+throw new RuntimeException("Can't parse feature=level string " + input 
+ ": " + "unable to parse " + levelString + " as a short.")
+}
+Array[String](name, levelString)

Review Comment:
   I pulled this method from FeatureCommand (originally I wanted them to use a 
shared method from a helper but that was a headache so I duplicated the 
method.) 
   
   I can revise this one though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603437357


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum FeatureVersion {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
+
+public static final FeatureVersion[] FEATURES;
+public static final List PRODUCTION_FEATURES;
+private final String name;
+private final FeatureVersionUtils.FeatureVersionImpl[] features;
+private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod;
+private final boolean usedInProduction;
+
+FeatureVersion(String name,
+   FeatureVersionUtils.FeatureVersionImpl[] features,
+   FeatureVersionUtils.CreateMethod createMethod,
+   boolean usedInProduction) {
+this.name = name;
+this.features = features;
+this.createFeatureVersionMethod = createMethod;
+this.usedInProduction = usedInProduction;
+}
+
+static {
+FeatureVersion[] enumValues = FeatureVersion.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.usedInProduction).collect(Collectors.toList());
+}
+
+public String featureName() {
+return name;
+}
+
+/**
+ * Creates a FeatureVersion from a given name and level with the correct 
feature object underneath.
+ *
+ * @param level   the level of the feature
+ * @returns   the FeatureVersionUtils.FeatureVersionImpl for the 
feature the enum is based on.
+ * @throwsIllegalArgumentException if the feature name is not 
valid (not implemented for this method)
+ */
+public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short 
level) {
+return createFeatureVersionMethod.fromFeatureLevel(level);
+}
+
+/**
+ * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+ * captured in {@link 
FeatureVersionUtils.FeatureVersionImpl#dependencies()}
+ * 
+ * For example, say feature X level x relies on feature Y level y:
+ * if feature X >= x then throw an error if feature Y < y.
+ *
+ * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in 
order to write the feature records to the cluster.
+ *
+ * @param feature   the feature we are validating
+ * @param metadataVersion   the metadata version we have (or want 
to set)
+ * @param features  the feature versions (besides 
MetadataVersion) we have (or want to set)
+ * @throws IllegalArgumentException if the feature is not valid
+ */
+public static void 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603398767


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -459,18 +459,20 @@ BrokerFeature processRegistrationFeature(
 FinalizedControllerFeatures finalizedFeatures,
 BrokerRegistrationRequestData.Feature feature
 ) {
-Optional finalized = finalizedFeatures.get(feature.name());
-if (finalized.isPresent()) {
-if (!VersionRange.of(feature.minSupportedVersion(), 
feature.maxSupportedVersion()).contains(finalized.get())) {
-throw new UnsupportedVersionException("Unable to register 
because the broker " +
-"does not support version " + finalized.get() + " of " + 
feature.name() +
-". It wants a version between " + 
feature.minSupportedVersion() + " and " +
-feature.maxSupportedVersion() + ", inclusive.");
-}
-} else {
-log.warn("Broker {} registered with feature {} that is unknown to 
the controller",
+int defaultVersion = 
feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default 
value for MetadataVersion is 1 not 0.
+short finalized = finalizedFeatures.getOrDefault(feature.name(), 
(short) defaultVersion);
+if (!VersionRange.of(feature.minSupportedVersion(), 
feature.maxSupportedVersion()).contains(finalized)) {
+throw new UnsupportedVersionException("Unable to register because 
the broker " +
+"does not support version " + finalized + " of " + 
feature.name() +
+". It wants a version between " + 
feature.minSupportedVersion() + " and " +
+feature.maxSupportedVersion() + ", inclusive.");
+}
+// A feature is not found in the finalizedFeature map if it is unknown 
to the controller or set to 0 (feature not enabled).
+// As more features roll out, it may be common to leave a feature 
disabled, so this log is debug level in the case
+// an intended feature is not being set.
+if (finalized == 0)
+log.debug("Broker {} registered with feature {} that is either 
unknown or version 0 on the controller",

Review Comment:
   It's really hard to differentiate between disabled and unknown because the 
protocol when setting a feature to 0 is to remove it. You will not have a 
record when you set it to 0 because of this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603396094


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+interface FeatureVersionImpl {
+short featureLevel();
+
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, 
its mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();
+
+/**
+ * A mapping from feature to level for all features that this feature 
depends on. If this feature doesn't
+ * depend on any others, return an empty map.
+ * For example, say feature X level x relies on feature Y level y:
+ * feature (X level x).dependencies() will return (Y -> y)
+ */
+Map dependencies();

Review Comment:
   We could do that. I was wondering if this would be the format we typically 
see in the upgrade tool or if we would have to convert everything. I think in 
the records in the metadata we store it as Map for features.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603394445


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements 
FeatureVersionUtils.FeatureVersionImpl {

Review Comment:
   I was going to keep it for testing. We have a SimpleExampleMesssage and I 
think a few other placeholder classes so I thought that was in line with the 
project. I can consider removing it if we think we get sufficient coverage with 
other methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603378178


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum FeatureVersion {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
+
+public static final FeatureVersion[] FEATURES;
+public static final List PRODUCTION_FEATURES;
+private final String name;
+private final FeatureVersionUtils.FeatureVersionImpl[] features;
+private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod;
+private final boolean usedInProduction;
+
+FeatureVersion(String name,
+   FeatureVersionUtils.FeatureVersionImpl[] features,
+   FeatureVersionUtils.CreateMethod createMethod,
+   boolean usedInProduction) {
+this.name = name;
+this.features = features;
+this.createFeatureVersionMethod = createMethod;
+this.usedInProduction = usedInProduction;
+}
+
+static {
+FeatureVersion[] enumValues = FeatureVersion.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.usedInProduction).collect(Collectors.toList());
+}
+
+public String featureName() {
+return name;
+}
+
+/**
+ * Creates a FeatureVersion from a given name and level with the correct 
feature object underneath.
+ *
+ * @param level   the level of the feature
+ * @returns   the FeatureVersionUtils.FeatureVersionImpl for the 
feature the enum is based on.
+ * @throwsIllegalArgumentException if the feature name is not 
valid (not implemented for this method)
+ */
+public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short 
level) {
+return createFeatureVersionMethod.fromFeatureLevel(level);
+}
+
+/**
+ * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+ * captured in {@link 
FeatureVersionUtils.FeatureVersionImpl#dependencies()}
+ * 
+ * For example, say feature X level x relies on feature Y level y:
+ * if feature X >= x then throw an error if feature Y < y.
+ *
+ * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in 
order to write the feature records to the cluster.
+ *
+ * @param feature   the feature we are validating
+ * @param metadataVersion   the metadata version we have (or want 
to set)
+ * @param features  the feature versions (besides 
MetadataVersion) we have (or want to set)
+ * @throws IllegalArgumentException if the feature is not valid
+ */
+public static void 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603376757


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum FeatureVersion {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
+
+public static final FeatureVersion[] FEATURES;
+public static final List PRODUCTION_FEATURES;
+private final String name;
+private final FeatureVersionUtils.FeatureVersionImpl[] features;
+private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod;
+private final boolean usedInProduction;
+
+FeatureVersion(String name,
+   FeatureVersionUtils.FeatureVersionImpl[] features,
+   FeatureVersionUtils.CreateMethod createMethod,
+   boolean usedInProduction) {
+this.name = name;
+this.features = features;
+this.createFeatureVersionMethod = createMethod;
+this.usedInProduction = usedInProduction;
+}
+
+static {
+FeatureVersion[] enumValues = FeatureVersion.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.usedInProduction).collect(Collectors.toList());
+}
+
+public String featureName() {
+return name;
+}
+
+/**
+ * Creates a FeatureVersion from a given name and level with the correct 
feature object underneath.
+ *
+ * @param level   the level of the feature
+ * @returns   the FeatureVersionUtils.FeatureVersionImpl for the 
feature the enum is based on.
+ * @throwsIllegalArgumentException if the feature name is not 
valid (not implemented for this method)
+ */
+public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short 
level) {
+return createFeatureVersionMethod.fromFeatureLevel(level);
+}
+
+/**
+ * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+ * captured in {@link 
FeatureVersionUtils.FeatureVersionImpl#dependencies()}
+ * 
+ * For example, say feature X level x relies on feature Y level y:
+ * if feature X >= x then throw an error if feature Y < y.
+ *
+ * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in 
order to write the feature records to the cluster.
+ *
+ * @param feature   the feature we are validating
+ * @param metadataVersion   the metadata version we have (or want 
to set)
+ * @param features  the feature versions (besides 
MetadataVersion) we have (or want to set)
+ * @throws IllegalArgumentException if the feature is not valid
+ */
+public static void 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603375064


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+interface FeatureVersionImpl {
+short featureLevel();
+
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, 
its mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();

Review Comment:
   No. This is not the minimum version. This is the first **image version** 
that supports the feature. 
   I wanted to have a mapping to image version, but that was heavily argued 
against on the mailing thread so I have to use MV as a proxy.
   
   This value is only a DEFAULT if no value is set during bootstrapping. 
   
   As long at the feature is in the image, you can set it regardless of the MV 
set (production ready or not.) Setting the version is completely independent 
from the MV set.



##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+interface FeatureVersionImpl {
+short featureLevel();
+
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, 
its mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();

Review Comment:
   No. This is not the minimum version. This is the first **image version** 
that supports the feature. 
   I wanted to have a mapping to image version, but that was heavily argued 
against on the mailing thread so I have to use MV as a proxy.
   
   This value is only a DEFAULT if no value is set during bootstrapping but MV 
is set.
   
   As long at the feature is in the image, you can set it regardless of the MV 
set (production ready or not.) Setting the version is completely independent 
from the MV set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603375064


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+interface FeatureVersionImpl {
+short featureLevel();
+
+String featureName();
+
+/**
+ * The next metadata version to be released when the feature became 
production ready.
+ * (Ie, if the current production MV is 17 when a feature is released, 
its mapping should be to MV 18)
+ */
+MetadataVersion metadataVersionMapping();

Review Comment:
   No. This is not the minimum version. This is the first **image version** 
that supports the feature. 
   I wanted to have a mapping to image version, but that was heavily argued 
against on the mailing thread.
   
   This value is only a DEFAULT if no value is set during bootstrapping. 
   
   As long at the feature is in the image, you can set it regardless of the MV 
set (production ready or not.) Setting the version is completely independent 
from the MV set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1603368871


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+interface FeatureVersionImpl {

Review Comment:
   Features is already a file in this directory :( but I will see what I can do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-16 Thread via GitHub


dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1602958576


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {

Review Comment:
   Is there a particular reason for grouping those two interfaces in this one? 
I am asking because at first that `FeatureVersionUtils.java` was an utils 
library (e.g. useful functions). I did not get that it was actually the one 
that contains the interface that one must implement.



##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum FeatureVersion {
+
+/**
+ * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature.
+ */
+TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
+
+public static final FeatureVersion[] FEATURES;
+public static final List PRODUCTION_FEATURES;
+private final String name;
+private final FeatureVersionUtils.FeatureVersionImpl[] features;
+private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod;
+private final boolean usedInProduction;
+
+FeatureVersion(String name,
+   FeatureVersionUtils.FeatureVersionImpl[] features,
+   FeatureVersionUtils.CreateMethod createMethod,
+   boolean usedInProduction) {
+this.name = name;
+this.features = features;
+this.createFeatureVersionMethod = createMethod;
+this.usedInProduction = usedInProduction;
+}
+
+static {
+FeatureVersion[] enumValues = FeatureVersion.values();
+FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+feature.usedInProduction).collect(Collectors.toList());
+}
+
+public String featureName() {
+return name;
+}
+
+/**
+ * Creates a FeatureVersion from a given name and level with the 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-10 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1597305375


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.List;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0),
+TEST_1(1),
+TEST_2(2);
+
+private short featureLevel;
+
+public static final String FEATURE_NAME = "test.feature.version";
+public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+
+TestFeatureVersion(int featureLevel) {
+this.featureLevel = (short) featureLevel;
+}
+
+public short featureLevel() {
+return featureLevel;
+}
+
+public String featureName() {
+return FEATURE_NAME;
+}
+
+public void validateVersion(MetadataVersion metadataVersion, 
List features) {
+// version 1 depends on metadata.version 3.3-IVO
+if (featureLevel >= 1 && 
metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0))
+throw new IllegalArgumentException(FEATURE_NAME + " could not be 
set to " + featureLevel +
+" because it depends on metadata.version=14 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
+}
+
+public static TestFeatureVersion metadataVersionMapping(MetadataVersion 
metadataVersion) {
+if (metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) {

Review Comment:
   This will be simplified when I fix the above. Potentially we can even 
include the 3.3 check in the FeatureVersion class since it will apply to all 
features and doesn't change on a per feature basis. Stay tuned for some 
cleanups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-10 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1597304812


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.List;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0),
+TEST_1(1),
+TEST_2(2);
+
+private short featureLevel;
+
+public static final String FEATURE_NAME = "test.feature.version";
+public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+
+TestFeatureVersion(int featureLevel) {
+this.featureLevel = (short) featureLevel;
+}

Review Comment:
   I have a plan for this but didn't quite get to it. Maybe we want to do this 
and maybe we don't. 
   
   One idea is to have all features have a standard pattern/class for the 
fields. This would contain the metadata version mapping and a Map of the required other features. Not sure if we will need any more 
complicated validation logic than that. We could leave the opportunity to have 
a more complicated validate method if we choose. 
   
   We can also show an example implementation in the TestFeatureVersion and 
folks can choose to replicate it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-10 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1597303887


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -258,6 +259,13 @@ public enum MetadataVersion {
 this.didMetadataChange = didMetadataChange;
 }
 
+public String featureName() {
+return FEATURE_NAME;
+}
+
+public void validateVersion(MetadataVersion metadataVersion, 
List features) {
+}

Review Comment:
   I removed this for now, we can add it back later if we want MetadataVersion 
to implement our interface. I think there are pros and cons for doing so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-08 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1594889173


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -258,6 +259,13 @@ public enum MetadataVersion {
 this.didMetadataChange = didMetadataChange;
 }
 
+public String featureName() {
+return FEATURE_NAME;
+}
+
+public void validateVersion(MetadataVersion metadataVersion, 
List features) {
+}

Review Comment:
   Will do. I think the main thing was that that KafkaConfig can't be passed 
due to directory structure. The important config is unstable version 
enablement, so in a followup, I plan to include it in this method signature and 
include logic to support unstable versioning for all versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-08 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1594886684


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.List;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0),
+TEST_1(1),
+TEST_2(2);
+
+private short featureLevel;
+
+public static final String FEATURE_NAME = "test.feature.version";
+public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+
+TestFeatureVersion(int featureLevel) {
+this.featureLevel = (short) featureLevel;
+}

Review Comment:
   yes, assuming we specify metadata version as a numerical value and not a 
string as it is done in many places in the code.
   
   as per the kip, validation may involve other features besides metadata 
version, so we will need to consider how that will be implemented.
   
   (I didn't have any example of such interdependence, but it was requested 
that I support it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-08 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1594885949


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.List;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0),
+TEST_1(1),
+TEST_2(2);
+
+private short featureLevel;
+
+public static final String FEATURE_NAME = "test.feature.version";
+public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+
+TestFeatureVersion(int featureLevel) {
+this.featureLevel = (short) featureLevel;
+}
+
+public short featureLevel() {
+return featureLevel;
+}
+
+public String featureName() {
+return FEATURE_NAME;
+}
+
+public void validateVersion(MetadataVersion metadataVersion, 
List features) {
+// version 1 depends on metadata.version 3.3-IVO
+if (featureLevel >= 1 && 
metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0))
+throw new IllegalArgumentException(FEATURE_NAME + " could not be 
set to " + featureLevel +
+" because it depends on metadata.version=14 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
+}
+
+public static TestFeatureVersion metadataVersionMapping(MetadataVersion 
metadataVersion) {
+if (metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) {

Review Comment:
   It's a little confusing, but the idea is that as long as the version is less 
than 3.8, we use 0
   So for mv 3.3 -> 3.8 we will use test_0
   
   The thing about 3.3, is that we can't set any version (test, transactions, 
etc, if the MV is < 3.3 since that is the version that introduced the feature 
records)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-08 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1594884853


##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This is an interface for the various features implemented for Kafka 
clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified interface for the features that will use a shared type in 
the API used to set and update them
+ * makes it easier to process these features.
+ */
+public interface FeatureVersion {
+
+/**
+ * Features currently used in production. If a feature is included in this 
list, it will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * When a feature is added here, make sure it has a mapping in {@link 
#defaultValue} and {@link #createFeature}.
+ * See {@link TestFeatureVersion} as an example.
+ */
+List PRODUCTION_FEATURES = Arrays.asList();

Review Comment:
   I can take a look at that approach. One thing that I tried to do is get the 
existing MetadataVersion to implement the interface. It is already defined as 
an enum though. Maybe though if we think it is not worthwhile to include 
MetadataVersion in this set, we can design something from the ground up that is 
less clunky.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-08 Thread via GitHub


artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1591704034


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -258,6 +259,13 @@ public enum MetadataVersion {
 this.didMetadataChange = didMetadataChange;
 }
 
+public String featureName() {
+return FEATURE_NAME;
+}
+
+public void validateVersion(MetadataVersion metadataVersion, 
List features) {
+}

Review Comment:
   Could we add a comment on why validation is not needed?



##
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java:
##
@@ -26,7 +26,7 @@
 /**
  * Represents an immutable basic version range using 2 attributes: min and 
max, each of type short.
  * The min and max attributes need to satisfy 2 rules:
- *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.
+ *  - they are each expected to be >= 0, as we only consider positive version 
values to be valid.

Review Comment:
   Nit: to be precise we should change 'positive' -> 'non-negative' :-)



##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.List;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0),
+TEST_1(1),
+TEST_2(2);
+
+private short featureLevel;
+
+public static final String FEATURE_NAME = "test.feature.version";
+public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+
+TestFeatureVersion(int featureLevel) {
+this.featureLevel = (short) featureLevel;
+}

Review Comment:
   I wonder if instead of doing manual implementation of validation and 
mapping, we could have minMetadataVersion as an enum argument, then validation 
would be just comparison, and for mapping we'd need to enumerate in reverse 
order and pick the first we find that supports the metadata version.



##
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This is an interface for the various features implemented for Kafka 
clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * 
+ * Having a unified interface for the features that will use a shared type in 
the API used to set and update them
+ * makes it easier to process these features.
+ */
+public interface FeatureVersion {
+
+/**
+ * Features currently used in production. If a feature is included in this 
list, it will also be specified when
+ * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+ *
+ * When a feature is added here, make sure it has a mapping in {@link 
#defaultValue} and {@link #createFeature}.
+ * See {@link TestFeatureVersion} as an example.
+ */
+ 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-24 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2076088235

   Separately I need to fix the update tool. It will always say the finalized 
version is 0 if the tool knows of the feature, even if the broker doesn't 
include it in the list of finalized features.
   
   ```
   Feature: metadata.versionSupportedMinVersion: 3.0-IV1
SupportedMaxVersion: 3.7-IV4FinalizedVersionLevel: 3.7-IV4  Epoch: 17
   Feature: test.feature.versionSupportedMinVersion: 0  
SupportedMaxVersion: 1  FinalizedVersionLevel: 0Epoch: 17
   ```
   
   ```
   
"supportedFeatures":[{"name":"metadata.version","minVersion":1,"maxVersion":19},{"name":"test.feature.version","minVersion":0,"maxVersion":1}],"finalizedFeaturesEpoch":17,"finalizedFeatures":[{"name":"metadata.version","maxVersionLevel":19,"minVersionLevel":19}]}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-24 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2076086916

   I've cleaned up the code to not set the record in the storage tool when the 
version is 0. I also cleaned up the log since it is not always the case that 
the controller doesn't know the version. For now, 0 is a reasonable default. 
   
   Here are the controller logs when i set the test version to 0 and to 1.
   
   ```
   [2024-04-24 17:15:05,566] INFO [QuorumController id=1] Creating new 
QuorumController with clusterId F0eUpBlzS4yJYcsM9DNfXA. 
(org.apache.kafka.controller.QuorumController)
   [2024-04-24 17:15:05,568] INFO [QuorumController id=1] Becoming the active 
controller at epoch 1, next write offset 1. 
(org.apache.kafka.controller.QuorumController)
   [2024-04-24 17:15:05,570] WARN [QuorumController id=1] Performing controller 
activation. The metadata log appears to be empty. Appending 1 bootstrap 
record(s) in metadata transaction at metadata.version 3.7-IV4 from bootstrap 
source 'the binary bootstrap metadata file: 
/tmp/kraft-combined-logs/bootstrap.checkpoint'. Setting the ZK migration state 
to NONE since this is a de-novo KRaft cluster. 
(org.apache.kafka.controller.QuorumController)
   [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed 
BeginTransactionRecord(name='Bootstrap records') at offset 1. 
(org.apache.kafka.controller.OffsetControlManager)
   [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed a 
FeatureLevelRecord setting metadata.version to 3.7-IV4 
(org.apache.kafka.controller.FeatureControlManager)
   [2024-04-24 17:15:05,571] INFO [QuorumController id=1] Replayed 
EndTransactionRecord() at offset 4. 
(org.apache.kafka.controller.OffsetControlManager)
   [2024-04-24 17:15:05,645] INFO [QuorumController id=1] Replayed 
RegisterControllerRecord contaning ControllerRegistration(id=1, 
incarnationId=h3WYlEEtTUCG6nOjFnIQxQ, zkMigrationReady=false, 
listeners=[Endpoint(listenerName='CONTROLLER', securityProtocol=PLAINTEXT, 
host='10.200.4.27', port=9093)], supportedFeatures={metadata.version: 1-19, 
test.feature.version: 0-1}). (org.apache.kafka.controller.ClusterControlManager)
   [2024-04-24 17:15:05,686] INFO [QuorumController id=1] Replayed initial 
RegisterBrokerRecord for broker 1: RegisterBrokerRecord(brokerId=1, 
isMigratingZkBroker=false, incarnationId=Ivj6roa7QnmcRbx_P_hg0A, brokerEpoch=6, 
endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, 
securityProtocol=0)], features=[BrokerFeature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=19), 
BrokerFeature(name='test.feature.version', minSupportedVersion=0, 
maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, 
logDirs=[j6NFYHN2xQ8wG1rUVXf7LA]) 
(org.apache.kafka.controller.ClusterControlManager)
   [2024-04-24 17:15:05,745] INFO [QuorumController id=1] Replayed 
RegisterBrokerRecord modifying the registration for broker 1: 
RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, 
incarnationId=Ivj6roa7QnmcRbx_P_hg0A, brokerEpoch=7, 
endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, 
securityProtocol=0)], features=[BrokerFeature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=19), 
BrokerFeature(name='test.feature.version', minSupportedVersion=0, 
maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, 
logDirs=[j6NFYHN2xQ8wG1rUVXf7LA]) 
(org.apache.kafka.controller.ClusterControlManager)
   [2024-04-24 17:15:05,786] INFO [QuorumController id=1] The request from 
broker 1 to unfence has been granted because it has caught up with the offset 
of its register broker record 7. 
(org.apache.kafka.controller.BrokerHeartbeatManager)
   [2024-04-24 17:15:05,788] INFO [QuorumController id=1] Replayed 
BrokerRegistrationChangeRecord modifying the registration for broker 1: 
BrokerRegistrationChangeRecord(brokerId=1, brokerEpoch=7, fenced=-1, 
inControlledShutdown=0, logDirs=[]) 
(org.apache.kafka.controller.ClusterControlManager)
   ```
   ```
   24-04-24 17:16:29,048] INFO [QuorumController id=1] Creating new 
QuorumController with clusterId F0eUpBlzS4yJYcsM9DNfXA. 
(org.apache.kafka.controller.QuorumController)
   [2024-04-24 17:16:29,050] INFO [QuorumController id=1] Becoming the active 
controller at epoch 1, next write offset 1. 
(org.apache.kafka.controller.QuorumController)
   [2024-04-24 17:16:29,051] WARN [QuorumController id=1] Performing controller 
activation. The metadata log appears to be empty. Appending 2 bootstrap 
record(s) in metadata transaction at metadata.version 3.7-IV4 from bootstrap 
source 'the binary bootstrap metadata file: 
/tmp/kraft-combined-logs/bootstrap.checkpoint'. Setting the ZK migration state 
to NONE since this is a de-novo KRaft cluster. 
(org.apache.kafka.controller.QuorumController)
   [2024-04-24 17:16:29,052] INFO [QuorumController id=1] Replayed 
BeginTransactionRecord(name='Bootstrap records') at offset 1. 

Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-18 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2065435357

   I may need to figure out how to deal with this...when setting the version to 
0.
   
   ```
   Replayed a FeatureLevelRecord removing feature test.feature.version
   ```
   ```
   Broker 1 registered with feature test.feature.version that is unknown to the 
controller (org.apache.kafka.controller.ClusterControlManager)
   [2024-04-18 15:32:25,440] INFO [QuorumController id=1] Replayed 
RegisterBrokerRecord modifying the registration for broker 1: 
RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=false, 
incarnationId=pihYtx_qSSKlJ2K31eGFOw, brokerEpoch=8, 
endPoints=[BrokerEndpoint(name='PLAINTEXT', host='localhost', port=9092, 
securityProtocol=0)], features=[BrokerFeature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=19), 
BrokerFeature(name='test.feature.version', minSupportedVersion=0, 
maxSupportedVersion=1)], rack=null, fenced=true, inControlledShutdown=false, 
logDirs=[T1fHZ5DjOC1Dk5CviOHPbg]) 
(org.apache.kafka.controller.ClusterControlManager)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-17 Thread via GitHub


jolshan commented on PR #15685:
URL: https://github.com/apache/kafka/pull/15685#issuecomment-2062753494

   I noticed I need to do Quorum and Broker features which are basically the 
same implementation. Stay tuned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-16 Thread via GitHub


splett2 commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1568050802


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -108,6 +103,43 @@ object StorageTool extends Logging {
 }
   }
 
+  def metadataVersionValidation(metadataVersion: MetadataVersion, config: 
Option[KafkaConfig]): Unit = {

Review Comment:
   nit: `validateMetadataVersion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-04-08 Thread via GitHub


jolshan opened a new pull request, #15685:
URL: https://github.com/apache/kafka/pull/15685

   As part of KIP-1022, I have created an interface for all the new features to 
be used when parsing the command line arguments, doing validations, getting 
default versions, etc. 
   
   I've also added the `--feature` flag to the storage tool to show how it will 
be used. 
   
   Created a TestFeatureVersion to show an implementation of the interface 
(besides MetadataVersion which is unique) and added tests using this new test 
feature.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org