Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-11 Thread via GitHub


jsancio commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1674397738


##
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##
@@ -313,10 +313,12 @@ private static FinalizedFeatureKeyCollection 
createFinalizedFeatureKeys(
 for (Map.Entry feature : finalizedFeatures.entrySet()) {
 final FinalizedFeatureKey key = new FinalizedFeatureKey();
 final short versionLevel = feature.getValue();
-key.setName(feature.getKey());
-key.setMinVersionLevel(versionLevel);
-key.setMaxVersionLevel(versionLevel);
-converted.add(key);
+if (versionLevel != 0) {

Review Comment:
   The indentation seems off. I think you need to remove 4 spaces.
   
   Let's write comment explaining why we only do this when the version level is 
not 0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-10 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1672762239


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -3027,6 +3028,11 @@ public long logEndOffset() {
 return log.endOffset().offset();
 }
 
+@Override
+public KRaftVersion kraftVersion() {
+return 
partitionState.kraftVersionAtOffset(quorum.highWatermark().get().offset());
+}

Review Comment:
   > The offset used should be quorum.highWatermark.map(hwm -> hwm.offset() - 
1). The HWM is an exclusive offset
   
   Fixed
   
   > (comments about committed vs. uncommitted version)
   
   I'm fine with returning uncommitted version here. Since this is accessed 
from brokers there shouldn't even be any difference (well, except in the 
combined case)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-10 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1672762239


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -3027,6 +3028,11 @@ public long logEndOffset() {
 return log.endOffset().offset();
 }
 
+@Override
+public KRaftVersion kraftVersion() {
+return 
partitionState.kraftVersionAtOffset(quorum.highWatermark().get().offset());
+}

Review Comment:
   > The offset used should be quorum.highWatermark.map(hwm -> hwm.offset() - 
1). The HWM is an exclusive offset
   
   Fixed
   
   > If we want to expose the committed kraft.version then we should change the 
signature to Optional kraftVersion().
   
   Hmm. An optional as a return value doesn't help us because we need to 
resolve it down to an actual `KRaftVersion`. This ultimately is intended for 
use by the metadata cache.
   
   I don't think this is a big issue in practice because the broker shouldn't 
be unfenced until the high watermark is known (among other things). So the 
cache shouldn't even be accessible. However, having a potential exception here 
if highWatermark is unset doesn't feel good. I will just return 
`partitionState.lastKraftVersion()` if highWatermark isn't defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-10 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r167272


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -541,8 +545,12 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 
   override def features(): FinalizedFeatures = {
 val image = _currentImage
+val finalizedFeatures = new java.util.HashMap[String, java.lang.Short]
+finalizedFeatures.putAll(image.features().finalizedVersions())

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-10 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1672754939


##
core/src/main/scala/kafka/server/MetadataCache.scala:
##
@@ -121,7 +122,10 @@ object MetadataCache {
 new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, 
zkMigrationEnabled)
   }
 
-  def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
-new KRaftMetadataCache(brokerId)
+  def kRaftMetadataCache(
+brokerId: Int,
+kraftVersionSupplier: Supplier[KRaftVersion] = () => 
KRaftVersion.KRAFT_VERSION_0

Review Comment:
   Yeah, default parameters are kind of icky. I'll remove the default parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-10 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1672721636


##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -162,7 +162,7 @@ class ControllerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+  metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => 
raftManager.client.kraftVersion())

Review Comment:
   That does not work when I tried it. I don't know why we would want to hide 
the fact that it's a lambda?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-10 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1672721395


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -192,7 +192,7 @@ class BrokerServer(
 
   logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
-  metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+  metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => 
raftManager.client.kraftVersion())

Review Comment:
   That does not work when I tried it. I don't know why we would want to hide 
the fact that it's a lambda?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-10 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1672720246


##
server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum KRaftVersion implements FeatureVersion {
+
+// Version 1 enables KIP-853.
+KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION),
+KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0);
+
+public static final String FEATURE_NAME = "kraft.version";
+
+private final short featureLevel;
+private final MetadataVersion bootstrapMetadataVersion;
+
+KRaftVersion(
+int featureLevel,
+MetadataVersion bootstrapMetadataVersion
+) {
+this.featureLevel = (short) featureLevel;
+this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+}
+
+@Override
+public short featureLevel() {
+return featureLevel;
+}
+
+public static KRaftVersion fromFeatureLevel(short version) {
+switch (version) {
+case 0:
+return KRAFT_VERSION_0;
+case 1:
+return KRAFT_VERSION_1;
+default:
+throw new RuntimeException("Unknown KRaft feature level: " + 
(int) version);
+}
+}
+
+@Override
+public String featureName() {
+return FEATURE_NAME;
+}
+
+@Override
+public MetadataVersion bootstrapMetadataVersion() {
+return bootstrapMetadataVersion;
+}
+
+@Override
+public Map dependencies() {
+return Collections.emptyMap();
+}
+
+public short quorumStateVersion() {
+switch (this) {
+case KRAFT_VERSION_0:
+return (short) 0;
+case KRAFT_VERSION_1:
+return (short) 1;
+}
+throw new RuntimeException("Unknown KRaft feature level: " + this);
+}

Review Comment:
   I added `KRaftVersionTest` which includes this and other things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-09 Thread via GitHub


jsancio commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1670726026


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -192,7 +192,7 @@ class BrokerServer(
 
   logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
-  metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+  metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => 
raftManager.client.kraftVersion())

Review Comment:
   Did you try this instead? I think Scala should be able to infer that you 
want the lambda and not evaluate the method.
   ```java
 metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, 
raftManager.client.kraftVersion)
   ```



##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -541,8 +545,12 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 
   override def features(): FinalizedFeatures = {
 val image = _currentImage
+val finalizedFeatures = new java.util.HashMap[String, java.lang.Short]
+finalizedFeatures.putAll(image.features().finalizedVersions())

Review Comment:
   Not a big deal given the current size of `finalizedVtersions()` but maybe 
passing it to the constructor is better going forward.
   ```java
   val finalizedFeatures = new java.util.HashMap[String, java.lang.Short](
   image.features().finalizedVersions()
   )
   ```



##
core/src/main/scala/kafka/server/ControllerServer.scala:
##
@@ -162,7 +162,7 @@ class ControllerServer(
   authorizer = config.createNewAuthorizer()
   authorizer.foreach(_.configure(config.originals))
 
-  metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+  metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => 
raftManager.client.kraftVersion())

Review Comment:
   Same here. Did you try using this?
   ```java
 metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, 
raftManager.client.kraftVersion)
   ```



##
core/src/main/scala/kafka/server/MetadataCache.scala:
##
@@ -121,7 +122,10 @@ object MetadataCache {
 new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, 
zkMigrationEnabled)
   }
 
-  def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
-new KRaftMetadataCache(brokerId)
+  def kRaftMetadataCache(
+brokerId: Int,
+kraftVersionSupplier: Supplier[KRaftVersion] = () => 
KRaftVersion.KRAFT_VERSION_0

Review Comment:
   This default parameter value seems risky if used in `src/main`. Why do you 
want it to be a default parameter? Also, not using default parameter values 
will make it easier to refactor this code to Java when the time come to do that.



##
raft/src/main/java/org/apache/kafka/raft/RaftClient.java:
##
@@ -250,4 +251,11 @@ default void beginShutdown() {}
  * or 0 if there have not been any records written.
  */
 long logEndOffset();
+
+/**
+ * Returns the current kraft.version.
+ *
+ * @return the current kraft.version.
+ */

Review Comment:
   If you agree with my previous comments, let's include that this method 
returns the uncommitted kraft.version.



##
raft/src/test/java/org/apache/kafka/raft/FileQuorumStateStoreTest.java:
##
@@ -64,7 +66,7 @@ void testWriteReadElectedLeader(short kraftVersion) throws 
IOException {
 );
 
 final Optional expected;
-if (kraftVersion == 1) {
+if (kraftVersion.featureLevel() > 0) {

Review Comment:
   If you agree with one of my previous message about adding 
`KRaftVersion::isReconfigSupported` maybe we can use that predicate here. This 
comment applies to a few lines in `src/test`.



##
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##
@@ -258,7 +261,9 @@ private void handleBatch(Batch batch, OptionalLong 
overrideOffset) {
 
 case KRAFT_VERSION:
 synchronized (kraftVersionHistory) {
-kraftVersionHistory.addAt(currentOffset, 
((KRaftVersionRecord) record.message()).kRaftVersion());
+kraftVersionHistory.addAt(currentOffset,
+KRaftVersion.fromFeatureLevel(
+((KRaftVersionRecord) 
record.message()).kRaftVersion()));

Review Comment:
   Do you mind using this indentation in the `raft` module? Each parameter is 
on its own line:
   ```java
   kraftVersionHistory.addAt(
   currentOffset,
   KRaftVersion.fromFeatureLevel(
   ((KRaftVersionRecord) 
record.message()).kRaftVersion()
   )
   );
   ```



##
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java:
##
@@ -237,12 +238,12 

Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-09 Thread via GitHub


jsancio commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1639917661


##
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##
@@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() {
 metadataVersion, source);
 }
 
+public BootstrapMetadata withMetadataVersion(MetadataVersion 
metadataVersion) {
+List newRecords = new ArrayList<>();
+for (ApiMessageAndVersion record : records) {
+if (recordToMetadataVersion(record.message()).isPresent()) {
+newRecords.add(metadataVersionToRecord(metadataVersion));
+} else {
+newRecords.add(record);
+}
+}
+return new BootstrapMetadata(newRecords, metadataVersion, source);
+}
+
+public BootstrapMetadata withKRaftVersion(KRaftVersion version) {
+List newRecords = new ArrayList<>();
+boolean foundKRaftVersion = false;
+for (ApiMessageAndVersion record : records) {
+if (recordToKRaftVersion(record.message()).isPresent()) {
+newRecords.add(kraftVersionToRecord(version));

Review Comment:
   See my comment but we shouldn't implement kraft.version in the metadata 
layer (metadata module). It should be implemented in the kraft layer (module).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-08 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1669277905


##
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##
@@ -73,6 +80,22 @@ public static Optional 
recordToMetadataVersion(ApiMessage recor
 return Optional.empty();
 }
 
+private static ApiMessageAndVersion kraftVersionToRecord(KRaftVersion 
version) {
+return new ApiMessageAndVersion(new FeatureLevelRecord().
+setName(KRaftVersion.FEATURE_NAME).
+setFeatureLevel(version.featureLevel()), (short) 0);
+}
+
+private static Optional recordToKRaftVersion(ApiMessage 
record) {
+if (record instanceof FeatureLevelRecord) {
+FeatureLevelRecord featureLevel = (FeatureLevelRecord) record;
+if (featureLevel.name().equals(KRaftVersion.FEATURE_NAME)) {
+return 
Optional.of(KRaftVersion.fromFeatureLevel(featureLevel.featureLevel()));
+}
+}
+return Optional.empty();
+}

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-08 Thread via GitHub


cmccabe commented on PR #16230:
URL: https://github.com/apache/kafka/pull/16230#issuecomment-2215235517

   I have revised this PR. We now get the value of `kraft.version` directly 
from the raft layer, and metadata layer does not have anything to do with it. 
This requires passing in a callback to the metadata cache on the brokers, which 
seems reasonable.
   
   I also merged in trunk and fixed a few things that people commented on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-08 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1669277755


##
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##
@@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() {
 metadataVersion, source);
 }
 
+public BootstrapMetadata withMetadataVersion(MetadataVersion 
metadataVersion) {
+List newRecords = new ArrayList<>();
+for (ApiMessageAndVersion record : records) {
+if (recordToMetadataVersion(record.message()).isPresent()) {
+newRecords.add(metadataVersionToRecord(metadataVersion));
+} else {
+newRecords.add(record);
+}
+}
+return new BootstrapMetadata(newRecords, metadataVersion, source);
+}
+
+public BootstrapMetadata withKRaftVersion(KRaftVersion version) {
+List newRecords = new ArrayList<>();
+boolean foundKRaftVersion = false;
+for (ApiMessageAndVersion record : records) {
+if (recordToKRaftVersion(record.message()).isPresent()) {
+newRecords.add(kraftVersionToRecord(version));

Review Comment:
   ack



##
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##
@@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() {
 metadataVersion, source);
 }
 
+public BootstrapMetadata withMetadataVersion(MetadataVersion 
metadataVersion) {
+List newRecords = new ArrayList<>();
+for (ApiMessageAndVersion record : records) {
+if (recordToMetadataVersion(record.message()).isPresent()) {
+newRecords.add(metadataVersionToRecord(metadataVersion));
+} else {
+newRecords.add(record);
+}
+}
+return new BootstrapMetadata(newRecords, metadataVersion, source);
+}
+
+public BootstrapMetadata withKRaftVersion(KRaftVersion version) {
+List newRecords = new ArrayList<>();
+boolean foundKRaftVersion = false;
+for (ApiMessageAndVersion record : records) {
+if (recordToKRaftVersion(record.message()).isPresent()) {
+newRecords.add(kraftVersionToRecord(version));
+} else {
+newRecords.add(record);
+}
+}
+if (!foundKRaftVersion) {
+newRecords.add(kraftVersionToRecord(version));

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-08 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1669239654


##
server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum KRaftVersion implements FeatureVersion {
+
+// Version 1 enables KIP-853.
+KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION),
+KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0);
+
+public static final String FEATURE_NAME = "kraft.version";
+
+private final short featureLevel;
+private final MetadataVersion bootstrapMetadataVersion;
+
+KRaftVersion(
+int featureLevel,
+MetadataVersion bootstrapMetadataVersion
+) {
+this.featureLevel = (short) featureLevel;
+this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+}
+
+@Override
+public short featureLevel() {
+return featureLevel;
+}
+
+public static KRaftVersion fromFeatureLevel(short version) {
+switch (version) {
+case 0:
+return KRAFT_VERSION_0;
+case 1:
+return KRAFT_VERSION_1;
+default:
+throw new RuntimeException("Unknown KRaft feature level: " + 
(int) version);
+}
+}
+
+@Override
+public String featureName() {
+return FEATURE_NAME;
+}
+
+@Override
+public MetadataVersion bootstrapMetadataVersion() {
+return bootstrapMetadataVersion;
+}
+
+@Override
+public Map dependencies() {
+return Collections.emptyMap();
+}
+
+public short quorumStateVersion() {
+switch (this) {
+case KRAFT_VERSION_0:
+return (short) 0;
+case KRAFT_VERSION_1:
+return (short) 1;
+}
+throw new RuntimeException("Unknown KRaft feature level: " + this);
+}

Review Comment:
   Which part do you want to test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-08 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1669237196


##
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##
@@ -40,7 +40,8 @@ public enum Features {
  * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
  */
 TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
-GROUP_VERSION("group.version", GroupVersion.values());
+GROUP_VERSION("group.version", GroupVersion.values()),
+KRAFT_VERSION("kraft.version", KRaftVersion.values());

Review Comment:
   `KRaftClusterTest.testUpdateMetadataVersion` tests this, when it checks for 
`kraft.version` in the Admin client output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-07-08 Thread via GitHub


cmccabe commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1669234970


##
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java:
##
@@ -146,7 +147,7 @@ final public static class Builder {
 private Time time = Time.SYSTEM;
 private int maxBatchSize = 1024;
 private MemoryPool memoryPool = MemoryPool.NONE;
-private short kraftVersion = 1;
+private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_0; // 
TODO

Review Comment:
   I'll change it to 1 since it was that way previously.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-06-18 Thread via GitHub


jsancio commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1644766348


##
server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum KRaftVersion implements FeatureVersion {
+
+// Version 1 enables KIP-853.
+KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION),
+KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0);

Review Comment:
   This should be IBP_3_9_IV0 now that we moved KIP-853 to 3.9.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-06-18 Thread via GitHub


jsancio commented on PR #16230:
URL: https://github.com/apache/kafka/pull/16230#issuecomment-2176402435

   > We also currently don't propagate control records to the metadata layer, 
so that is something we'd have to consider changing.
   
   We actually do, I changed this in this PR 
https://github.com/apache/kafka/commit/bfe81d622979809325c31d549943c40f6f0f7337#diff-5d07bcc6e077fc85a7f245be27d8ee2d3b7c4656f1e8a8dc3909eb70d3410e4cR150
   
   The controller just skips those control records.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-06-17 Thread via GitHub


cmccabe commented on PR #16230:
URL: https://github.com/apache/kafka/pull/16230#issuecomment-2174664455

   I was originally going to have two records: one `KRaftVersionRecord` for the 
raft layer, and one `FeatureRecord` for the metadata layer, encoding the same 
data. The advantage of doing it this way is that it will work well with the 
current code structure. For example, we have code that validates that 
FeaturesImage can dump its state and restore that state. If FeaturesImage is 
taking input from outside of the list of metadata records, that invariant is 
broken. We also currently don't propagate control records to the metadata 
layer, so that is something we'd have to consider changing.
   
   The big disadvantage of having two records rather than one is the state can 
get out of sync. Which I do think is a real risk, especially when we are 
changing things.
   
   I realize writing the record at the Raft layer seems useful to you, but in 
the metadata layer, the thought that `read(image.write) != image` does not 
spark joy. If this is going to be handled by the Raft layer and not the 
metadata layer, then maybe it should be excluded from the image altogether. 
We'll have to talk about that.
   
   In the meantime, I merged in trunk. There were import statement conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-06-14 Thread via GitHub


jsancio commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1639921798


##
raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java:
##
@@ -146,7 +147,7 @@ final public static class Builder {
 private Time time = Time.SYSTEM;
 private int maxBatchSize = 1024;
 private MemoryPool memoryPool = MemoryPool.NONE;
-private short kraftVersion = 1;
+private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_0; // 
TODO

Review Comment:
   TODO reminder. What it is missing to make 1 the default for the builder?



##
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##
@@ -73,6 +80,22 @@ public static Optional 
recordToMetadataVersion(ApiMessage recor
 return Optional.empty();
 }
 
+private static ApiMessageAndVersion kraftVersionToRecord(KRaftVersion 
version) {
+return new ApiMessageAndVersion(new FeatureLevelRecord().
+setName(KRaftVersion.FEATURE_NAME).
+setFeatureLevel(version.featureLevel()), (short) 0);
+}
+
+private static Optional recordToKRaftVersion(ApiMessage 
record) {
+if (record instanceof FeatureLevelRecord) {
+FeatureLevelRecord featureLevel = (FeatureLevelRecord) record;
+if (featureLevel.name().equals(KRaftVersion.FEATURE_NAME)) {
+return 
Optional.of(KRaftVersion.fromFeatureLevel(featureLevel.featureLevel()));
+}
+}
+return Optional.empty();
+}

Review Comment:
   I don't think we should do this. The kraft.version is written to the cluster 
metadata log using the KRaftVersion control record. These are the cases when 
KRaft will write this control record:
   
   1. The kafka-storage tool will write it to the checkpoint/snapshot if it 
being formatted to support kraft.version 1. [Storage tool changes for 
KIP-853](https://issues.apache.org/jira/browse/KAFKA-16518)
   2. `KafkaRaftClient` will write the KRaftVersion control record to the 
checkpoint/snapshot using `RecordsSnapshotWriter.Builder` if the control record 
is part of the log segments. This is already in trunk.
   3. `KafkaRaftClient` will write the `KRaftVersion` control record when 
handling `UpdateFeatures`. [UpdateFeatures for 
kraft.version](https://issues.apache.org/jira/browse/KAFKA-16538)
   4. The first KRaft leader will write the `KRaftVersion` control record if it 
is in the bootstrapping checkpoint/snapshot but it is has never been written to 
the log segments. [Support for first leader bootstrapping the voter 
set](https://issues.apache.org/jira/browse/KAFKA-16532)
   
   I think that KRaftVersion should behave different from other feature level 
in that the KRaft version should be persisted and replicated using control 
records instead of metadata records. This is needed mainly because KRaft needs 
to read and process this record as uncommitted data. The controller and the 
metadata layer only sees committed records.



##
server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum KRaftVersion implements FeatureVersion {
+
+// Version 1 enables KIP-853.
+KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION),
+KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0);
+
+public static final String FEATURE_NAME = "kraft.version";
+
+private final short featureLevel;
+private final MetadataVersion bootstrapMetadataVersion;
+
+KRaftVersion(
+int featureLevel,
+MetadataVersion bootstrapMetadataVersion
+) {
+this.featureLevel = (short) featureLevel;
+this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+}
+
+@Override
+public short featureLevel() {
+return featureLevel;
+}
+
+public static KRaftVersion fromFeatureLevel(short version) {
+switch (version) {
+case 0:
+return KRAFT_VERSION_0;
+case 1:
+return 

Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-06-13 Thread via GitHub


dengziming commented on code in PR #16230:
URL: https://github.com/apache/kafka/pull/16230#discussion_r1637473020


##
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##
@@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() {
 metadataVersion, source);
 }
 
+public BootstrapMetadata withMetadataVersion(MetadataVersion 
metadataVersion) {
+List newRecords = new ArrayList<>();
+for (ApiMessageAndVersion record : records) {
+if (recordToMetadataVersion(record.message()).isPresent()) {
+newRecords.add(metadataVersionToRecord(metadataVersion));
+} else {
+newRecords.add(record);
+}
+}
+return new BootstrapMetadata(newRecords, metadataVersion, source);
+}
+
+public BootstrapMetadata withKRaftVersion(KRaftVersion version) {
+List newRecords = new ArrayList<>();
+boolean foundKRaftVersion = false;
+for (ApiMessageAndVersion record : records) {
+if (recordToKRaftVersion(record.message()).isPresent()) {
+newRecords.add(kraftVersionToRecord(version));
+} else {
+newRecords.add(record);
+}
+}
+if (!foundKRaftVersion) {
+newRecords.add(kraftVersionToRecord(version));

Review Comment:
   We have decided to propagate feature version for kraft.version using 
KRaftVersionRecord control record,  should we also add a `ControlRecord` to 
make it consistent with `FeatureLevelRecord`



##
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:
##
@@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() {
 metadataVersion, source);
 }
 
+public BootstrapMetadata withMetadataVersion(MetadataVersion 
metadataVersion) {
+List newRecords = new ArrayList<>();
+for (ApiMessageAndVersion record : records) {
+if (recordToMetadataVersion(record.message()).isPresent()) {
+newRecords.add(metadataVersionToRecord(metadataVersion));
+} else {
+newRecords.add(record);
+}
+}
+return new BootstrapMetadata(newRecords, metadataVersion, source);
+}
+
+public BootstrapMetadata withKRaftVersion(KRaftVersion version) {
+List newRecords = new ArrayList<>();
+boolean foundKRaftVersion = false;
+for (ApiMessageAndVersion record : records) {
+if (recordToKRaftVersion(record.message()).isPresent()) {
+newRecords.add(kraftVersionToRecord(version));

Review Comment:
   We should add: `foundKRaftVersion=true;` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]

2024-06-06 Thread via GitHub


cmccabe opened a new pull request, #16230:
URL: https://github.com/apache/kafka/pull/16230

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org