Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
jsancio commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1674397738 ## clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java: ## @@ -313,10 +313,12 @@ private static FinalizedFeatureKeyCollection createFinalizedFeatureKeys( for (Map.Entry feature : finalizedFeatures.entrySet()) { final FinalizedFeatureKey key = new FinalizedFeatureKey(); final short versionLevel = feature.getValue(); -key.setName(feature.getKey()); -key.setMinVersionLevel(versionLevel); -key.setMaxVersionLevel(versionLevel); -converted.add(key); +if (versionLevel != 0) { Review Comment: The indentation seems off. I think you need to remove 4 spaces. Let's write comment explaining why we only do this when the version level is not 0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1672762239 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -3027,6 +3028,11 @@ public long logEndOffset() { return log.endOffset().offset(); } +@Override +public KRaftVersion kraftVersion() { +return partitionState.kraftVersionAtOffset(quorum.highWatermark().get().offset()); +} Review Comment: > The offset used should be quorum.highWatermark.map(hwm -> hwm.offset() - 1). The HWM is an exclusive offset Fixed > (comments about committed vs. uncommitted version) I'm fine with returning uncommitted version here. Since this is accessed from brokers there shouldn't even be any difference (well, except in the combined case) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1672762239 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -3027,6 +3028,11 @@ public long logEndOffset() { return log.endOffset().offset(); } +@Override +public KRaftVersion kraftVersion() { +return partitionState.kraftVersionAtOffset(quorum.highWatermark().get().offset()); +} Review Comment: > The offset used should be quorum.highWatermark.map(hwm -> hwm.offset() - 1). The HWM is an exclusive offset Fixed > If we want to expose the committed kraft.version then we should change the signature to Optional kraftVersion(). Hmm. An optional as a return value doesn't help us because we need to resolve it down to an actual `KRaftVersion`. This ultimately is intended for use by the metadata cache. I don't think this is a big issue in practice because the broker shouldn't be unfenced until the high watermark is known (among other things). So the cache shouldn't even be accessible. However, having a potential exception here if highWatermark is unset doesn't feel good. I will just return `partitionState.lastKraftVersion()` if highWatermark isn't defined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r167272 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -541,8 +545,12 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w override def features(): FinalizedFeatures = { val image = _currentImage +val finalizedFeatures = new java.util.HashMap[String, java.lang.Short] +finalizedFeatures.putAll(image.features().finalizedVersions()) Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1672754939 ## core/src/main/scala/kafka/server/MetadataCache.scala: ## @@ -121,7 +122,10 @@ object MetadataCache { new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, zkMigrationEnabled) } - def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = { -new KRaftMetadataCache(brokerId) + def kRaftMetadataCache( +brokerId: Int, +kraftVersionSupplier: Supplier[KRaftVersion] = () => KRaftVersion.KRAFT_VERSION_0 Review Comment: Yeah, default parameters are kind of icky. I'll remove the default parameter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1672721636 ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -162,7 +162,7 @@ class ControllerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId) + metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion()) Review Comment: That does not work when I tried it. I don't know why we would want to hide the fact that it's a lambda? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1672721395 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -192,7 +192,7 @@ class BrokerServer( logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) - metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId) + metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion()) Review Comment: That does not work when I tried it. I don't know why we would want to hide the fact that it's a lambda? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1672720246 ## server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum KRaftVersion implements FeatureVersion { + +// Version 1 enables KIP-853. +KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION), +KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0); + +public static final String FEATURE_NAME = "kraft.version"; + +private final short featureLevel; +private final MetadataVersion bootstrapMetadataVersion; + +KRaftVersion( +int featureLevel, +MetadataVersion bootstrapMetadataVersion +) { +this.featureLevel = (short) featureLevel; +this.bootstrapMetadataVersion = bootstrapMetadataVersion; +} + +@Override +public short featureLevel() { +return featureLevel; +} + +public static KRaftVersion fromFeatureLevel(short version) { +switch (version) { +case 0: +return KRAFT_VERSION_0; +case 1: +return KRAFT_VERSION_1; +default: +throw new RuntimeException("Unknown KRaft feature level: " + (int) version); +} +} + +@Override +public String featureName() { +return FEATURE_NAME; +} + +@Override +public MetadataVersion bootstrapMetadataVersion() { +return bootstrapMetadataVersion; +} + +@Override +public Map dependencies() { +return Collections.emptyMap(); +} + +public short quorumStateVersion() { +switch (this) { +case KRAFT_VERSION_0: +return (short) 0; +case KRAFT_VERSION_1: +return (short) 1; +} +throw new RuntimeException("Unknown KRaft feature level: " + this); +} Review Comment: I added `KRaftVersionTest` which includes this and other things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
jsancio commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1670726026 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -192,7 +192,7 @@ class BrokerServer( logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) - metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId) + metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion()) Review Comment: Did you try this instead? I think Scala should be able to infer that you want the lambda and not evaluate the method. ```java metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, raftManager.client.kraftVersion) ``` ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -541,8 +545,12 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w override def features(): FinalizedFeatures = { val image = _currentImage +val finalizedFeatures = new java.util.HashMap[String, java.lang.Short] +finalizedFeatures.putAll(image.features().finalizedVersions()) Review Comment: Not a big deal given the current size of `finalizedVtersions()` but maybe passing it to the constructor is better going forward. ```java val finalizedFeatures = new java.util.HashMap[String, java.lang.Short]( image.features().finalizedVersions() ) ``` ## core/src/main/scala/kafka/server/ControllerServer.scala: ## @@ -162,7 +162,7 @@ class ControllerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId) + metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion()) Review Comment: Same here. Did you try using this? ```java metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, raftManager.client.kraftVersion) ``` ## core/src/main/scala/kafka/server/MetadataCache.scala: ## @@ -121,7 +122,10 @@ object MetadataCache { new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, zkMigrationEnabled) } - def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = { -new KRaftMetadataCache(brokerId) + def kRaftMetadataCache( +brokerId: Int, +kraftVersionSupplier: Supplier[KRaftVersion] = () => KRaftVersion.KRAFT_VERSION_0 Review Comment: This default parameter value seems risky if used in `src/main`. Why do you want it to be a default parameter? Also, not using default parameter values will make it easier to refactor this code to Java when the time come to do that. ## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ## @@ -250,4 +251,11 @@ default void beginShutdown() {} * or 0 if there have not been any records written. */ long logEndOffset(); + +/** + * Returns the current kraft.version. + * + * @return the current kraft.version. + */ Review Comment: If you agree with my previous comments, let's include that this method returns the uncommitted kraft.version. ## raft/src/test/java/org/apache/kafka/raft/FileQuorumStateStoreTest.java: ## @@ -64,7 +66,7 @@ void testWriteReadElectedLeader(short kraftVersion) throws IOException { ); final Optional expected; -if (kraftVersion == 1) { +if (kraftVersion.featureLevel() > 0) { Review Comment: If you agree with one of my previous message about adding `KRaftVersion::isReconfigSupported` maybe we can use that predicate here. This comment applies to a few lines in `src/test`. ## raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java: ## @@ -258,7 +261,9 @@ private void handleBatch(Batch batch, OptionalLong overrideOffset) { case KRAFT_VERSION: synchronized (kraftVersionHistory) { -kraftVersionHistory.addAt(currentOffset, ((KRaftVersionRecord) record.message()).kRaftVersion()); +kraftVersionHistory.addAt(currentOffset, +KRaftVersion.fromFeatureLevel( +((KRaftVersionRecord) record.message()).kRaftVersion())); Review Comment: Do you mind using this indentation in the `raft` module? Each parameter is on its own line: ```java kraftVersionHistory.addAt( currentOffset, KRaftVersion.fromFeatureLevel( ((KRaftVersionRecord) record.message()).kRaftVersion() ) ); ``` ## raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java: ## @@ -237,12 +238,12
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
jsancio commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1639917661 ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } +public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { +List newRecords = new ArrayList<>(); +for (ApiMessageAndVersion record : records) { +if (recordToMetadataVersion(record.message()).isPresent()) { +newRecords.add(metadataVersionToRecord(metadataVersion)); +} else { +newRecords.add(record); +} +} +return new BootstrapMetadata(newRecords, metadataVersion, source); +} + +public BootstrapMetadata withKRaftVersion(KRaftVersion version) { +List newRecords = new ArrayList<>(); +boolean foundKRaftVersion = false; +for (ApiMessageAndVersion record : records) { +if (recordToKRaftVersion(record.message()).isPresent()) { +newRecords.add(kraftVersionToRecord(version)); Review Comment: See my comment but we shouldn't implement kraft.version in the metadata layer (metadata module). It should be implemented in the kraft layer (module). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1669277905 ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -73,6 +80,22 @@ public static Optional recordToMetadataVersion(ApiMessage recor return Optional.empty(); } +private static ApiMessageAndVersion kraftVersionToRecord(KRaftVersion version) { +return new ApiMessageAndVersion(new FeatureLevelRecord(). +setName(KRaftVersion.FEATURE_NAME). +setFeatureLevel(version.featureLevel()), (short) 0); +} + +private static Optional recordToKRaftVersion(ApiMessage record) { +if (record instanceof FeatureLevelRecord) { +FeatureLevelRecord featureLevel = (FeatureLevelRecord) record; +if (featureLevel.name().equals(KRaftVersion.FEATURE_NAME)) { +return Optional.of(KRaftVersion.fromFeatureLevel(featureLevel.featureLevel())); +} +} +return Optional.empty(); +} Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on PR #16230: URL: https://github.com/apache/kafka/pull/16230#issuecomment-2215235517 I have revised this PR. We now get the value of `kraft.version` directly from the raft layer, and metadata layer does not have anything to do with it. This requires passing in a callback to the metadata cache on the brokers, which seems reasonable. I also merged in trunk and fixed a few things that people commented on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1669277755 ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } +public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { +List newRecords = new ArrayList<>(); +for (ApiMessageAndVersion record : records) { +if (recordToMetadataVersion(record.message()).isPresent()) { +newRecords.add(metadataVersionToRecord(metadataVersion)); +} else { +newRecords.add(record); +} +} +return new BootstrapMetadata(newRecords, metadataVersion, source); +} + +public BootstrapMetadata withKRaftVersion(KRaftVersion version) { +List newRecords = new ArrayList<>(); +boolean foundKRaftVersion = false; +for (ApiMessageAndVersion record : records) { +if (recordToKRaftVersion(record.message()).isPresent()) { +newRecords.add(kraftVersionToRecord(version)); Review Comment: ack ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } +public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { +List newRecords = new ArrayList<>(); +for (ApiMessageAndVersion record : records) { +if (recordToMetadataVersion(record.message()).isPresent()) { +newRecords.add(metadataVersionToRecord(metadataVersion)); +} else { +newRecords.add(record); +} +} +return new BootstrapMetadata(newRecords, metadataVersion, source); +} + +public BootstrapMetadata withKRaftVersion(KRaftVersion version) { +List newRecords = new ArrayList<>(); +boolean foundKRaftVersion = false; +for (ApiMessageAndVersion record : records) { +if (recordToKRaftVersion(record.message()).isPresent()) { +newRecords.add(kraftVersionToRecord(version)); +} else { +newRecords.add(record); +} +} +if (!foundKRaftVersion) { +newRecords.add(kraftVersionToRecord(version)); Review Comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1669239654 ## server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum KRaftVersion implements FeatureVersion { + +// Version 1 enables KIP-853. +KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION), +KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0); + +public static final String FEATURE_NAME = "kraft.version"; + +private final short featureLevel; +private final MetadataVersion bootstrapMetadataVersion; + +KRaftVersion( +int featureLevel, +MetadataVersion bootstrapMetadataVersion +) { +this.featureLevel = (short) featureLevel; +this.bootstrapMetadataVersion = bootstrapMetadataVersion; +} + +@Override +public short featureLevel() { +return featureLevel; +} + +public static KRaftVersion fromFeatureLevel(short version) { +switch (version) { +case 0: +return KRAFT_VERSION_0; +case 1: +return KRAFT_VERSION_1; +default: +throw new RuntimeException("Unknown KRaft feature level: " + (int) version); +} +} + +@Override +public String featureName() { +return FEATURE_NAME; +} + +@Override +public MetadataVersion bootstrapMetadataVersion() { +return bootstrapMetadataVersion; +} + +@Override +public Map dependencies() { +return Collections.emptyMap(); +} + +public short quorumStateVersion() { +switch (this) { +case KRAFT_VERSION_0: +return (short) 0; +case KRAFT_VERSION_1: +return (short) 1; +} +throw new RuntimeException("Unknown KRaft feature level: " + this); +} Review Comment: Which part do you want to test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1669237196 ## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ## @@ -40,7 +40,8 @@ public enum Features { * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. */ TEST_VERSION("test.feature.version", TestFeatureVersion.values()), -GROUP_VERSION("group.version", GroupVersion.values()); +GROUP_VERSION("group.version", GroupVersion.values()), +KRAFT_VERSION("kraft.version", KRaftVersion.values()); Review Comment: `KRaftClusterTest.testUpdateMetadataVersion` tests this, when it checks for `kraft.version` in the Admin client output. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1669234970 ## raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java: ## @@ -146,7 +147,7 @@ final public static class Builder { private Time time = Time.SYSTEM; private int maxBatchSize = 1024; private MemoryPool memoryPool = MemoryPool.NONE; -private short kraftVersion = 1; +private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_0; // TODO Review Comment: I'll change it to 1 since it was that way previously. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
jsancio commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1644766348 ## server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum KRaftVersion implements FeatureVersion { + +// Version 1 enables KIP-853. +KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION), +KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0); Review Comment: This should be IBP_3_9_IV0 now that we moved KIP-853 to 3.9. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
jsancio commented on PR #16230: URL: https://github.com/apache/kafka/pull/16230#issuecomment-2176402435 > We also currently don't propagate control records to the metadata layer, so that is something we'd have to consider changing. We actually do, I changed this in this PR https://github.com/apache/kafka/commit/bfe81d622979809325c31d549943c40f6f0f7337#diff-5d07bcc6e077fc85a7f245be27d8ee2d3b7c4656f1e8a8dc3909eb70d3410e4cR150 The controller just skips those control records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe commented on PR #16230: URL: https://github.com/apache/kafka/pull/16230#issuecomment-2174664455 I was originally going to have two records: one `KRaftVersionRecord` for the raft layer, and one `FeatureRecord` for the metadata layer, encoding the same data. The advantage of doing it this way is that it will work well with the current code structure. For example, we have code that validates that FeaturesImage can dump its state and restore that state. If FeaturesImage is taking input from outside of the list of metadata records, that invariant is broken. We also currently don't propagate control records to the metadata layer, so that is something we'd have to consider changing. The big disadvantage of having two records rather than one is the state can get out of sync. Which I do think is a real risk, especially when we are changing things. I realize writing the record at the Raft layer seems useful to you, but in the metadata layer, the thought that `read(image.write) != image` does not spark joy. If this is going to be handled by the Raft layer and not the metadata layer, then maybe it should be excluded from the image altogether. We'll have to talk about that. In the meantime, I merged in trunk. There were import statement conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
jsancio commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1639921798 ## raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java: ## @@ -146,7 +147,7 @@ final public static class Builder { private Time time = Time.SYSTEM; private int maxBatchSize = 1024; private MemoryPool memoryPool = MemoryPool.NONE; -private short kraftVersion = 1; +private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_0; // TODO Review Comment: TODO reminder. What it is missing to make 1 the default for the builder? ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -73,6 +80,22 @@ public static Optional recordToMetadataVersion(ApiMessage recor return Optional.empty(); } +private static ApiMessageAndVersion kraftVersionToRecord(KRaftVersion version) { +return new ApiMessageAndVersion(new FeatureLevelRecord(). +setName(KRaftVersion.FEATURE_NAME). +setFeatureLevel(version.featureLevel()), (short) 0); +} + +private static Optional recordToKRaftVersion(ApiMessage record) { +if (record instanceof FeatureLevelRecord) { +FeatureLevelRecord featureLevel = (FeatureLevelRecord) record; +if (featureLevel.name().equals(KRaftVersion.FEATURE_NAME)) { +return Optional.of(KRaftVersion.fromFeatureLevel(featureLevel.featureLevel())); +} +} +return Optional.empty(); +} Review Comment: I don't think we should do this. The kraft.version is written to the cluster metadata log using the KRaftVersion control record. These are the cases when KRaft will write this control record: 1. The kafka-storage tool will write it to the checkpoint/snapshot if it being formatted to support kraft.version 1. [Storage tool changes for KIP-853](https://issues.apache.org/jira/browse/KAFKA-16518) 2. `KafkaRaftClient` will write the KRaftVersion control record to the checkpoint/snapshot using `RecordsSnapshotWriter.Builder` if the control record is part of the log segments. This is already in trunk. 3. `KafkaRaftClient` will write the `KRaftVersion` control record when handling `UpdateFeatures`. [UpdateFeatures for kraft.version](https://issues.apache.org/jira/browse/KAFKA-16538) 4. The first KRaft leader will write the `KRaftVersion` control record if it is in the bootstrapping checkpoint/snapshot but it is has never been written to the log segments. [Support for first leader bootstrapping the voter set](https://issues.apache.org/jira/browse/KAFKA-16532) I think that KRaftVersion should behave different from other feature level in that the KRaft version should be persisted and replicated using control records instead of metadata records. This is needed mainly because KRaft needs to read and process this record as uncommitted data. The controller and the metadata layer only sees committed records. ## server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum KRaftVersion implements FeatureVersion { + +// Version 1 enables KIP-853. +KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION), +KRAFT_VERSION_1(1, MetadataVersion.IBP_3_8_IV0); + +public static final String FEATURE_NAME = "kraft.version"; + +private final short featureLevel; +private final MetadataVersion bootstrapMetadataVersion; + +KRaftVersion( +int featureLevel, +MetadataVersion bootstrapMetadataVersion +) { +this.featureLevel = (short) featureLevel; +this.bootstrapMetadataVersion = bootstrapMetadataVersion; +} + +@Override +public short featureLevel() { +return featureLevel; +} + +public static KRaftVersion fromFeatureLevel(short version) { +switch (version) { +case 0: +return KRAFT_VERSION_0; +case 1: +return
Re: [PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
dengziming commented on code in PR #16230: URL: https://github.com/apache/kafka/pull/16230#discussion_r1637473020 ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } +public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { +List newRecords = new ArrayList<>(); +for (ApiMessageAndVersion record : records) { +if (recordToMetadataVersion(record.message()).isPresent()) { +newRecords.add(metadataVersionToRecord(metadataVersion)); +} else { +newRecords.add(record); +} +} +return new BootstrapMetadata(newRecords, metadataVersion, source); +} + +public BootstrapMetadata withKRaftVersion(KRaftVersion version) { +List newRecords = new ArrayList<>(); +boolean foundKRaftVersion = false; +for (ApiMessageAndVersion record : records) { +if (recordToKRaftVersion(record.message()).isPresent()) { +newRecords.add(kraftVersionToRecord(version)); +} else { +newRecords.add(record); +} +} +if (!foundKRaftVersion) { +newRecords.add(kraftVersionToRecord(version)); Review Comment: We have decided to propagate feature version for kraft.version using KRaftVersionRecord control record, should we also add a `ControlRecord` to make it consistent with `FeatureLevelRecord` ## metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java: ## @@ -116,6 +139,34 @@ public BootstrapMetadata copyWithOnlyVersion() { metadataVersion, source); } +public BootstrapMetadata withMetadataVersion(MetadataVersion metadataVersion) { +List newRecords = new ArrayList<>(); +for (ApiMessageAndVersion record : records) { +if (recordToMetadataVersion(record.message()).isPresent()) { +newRecords.add(metadataVersionToRecord(metadataVersion)); +} else { +newRecords.add(record); +} +} +return new BootstrapMetadata(newRecords, metadataVersion, source); +} + +public BootstrapMetadata withKRaftVersion(KRaftVersion version) { +List newRecords = new ArrayList<>(); +boolean foundKRaftVersion = false; +for (ApiMessageAndVersion record : records) { +if (recordToKRaftVersion(record.message()).isPresent()) { +newRecords.add(kraftVersionToRecord(version)); Review Comment: We should add: `foundKRaftVersion=true;` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16772: Introduce kraft.version to support KIP-853 [kafka]
cmccabe opened a new pull request, #16230: URL: https://github.com/apache/kafka/pull/16230 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org