[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r496901588 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Represents a range of version levels supported by every broker in a cluster for some feature. + */ +public class FinalizedVersionRange { Review comment: Do we want to have a different name from `org.apache.kafka.common.feature.FinalizedVersionRange`, such as `FinalizedVersionLevels`? Same case for `SupportedVersionRange`, personally I feel the same class name makes the navigation harder. ## File path: core/src/main/scala/kafka/server/BrokerFeatures.scala ## @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A class that encapsulates the latest features supported by the Broker and also provides APIs to + * check for incompatibilities between the features supported by the Broker and finalized features. + * The class also enables feature version level deprecation, as explained below. This class is + * immutable in production. It provides few APIs to mutate state only for the purpose of testing. + * + * Feature version level deprecation: + * == + * + * Deprecation of certain version levels of a feature is a process to stop supporting the + * functionality offered by the feature at those version levels, across the entire Kafka cluster. + * Feature version deprecation is a simple 2-step process explained below. In each step below, an + * example is provided to help understand the process better: + * + * STEP 1: + * === + * + * In the first step, a major Kafka release is made with a Broker code change (explained later + * below) that establishes the intent to deprecate certain versions of one or more features + * cluster-wide. When this new Kafka release is deployed to the cluster, deprecated finalized + * feature versions are no longer advertised to the client, but they can still be used by existing + * connections. The way it works is that the feature versioning system (via the controller) will + * automatically persist the new minVersionLevel for the feature in ZK to propagate the deprecation + * of certain versions. After this happens, any external client that queries the Broker to learn the + * feature versions will at some point start to see the new value for the finalized minVersionLevel + * for the feature. The external clients are expected to stop using the deprecated versions at least + * by the time that they learn about it. + * + * Here is how the above code change needs to be done: + * In order to deprecate feature version levels, in the supportedFeatures map you need to supply a + * specific firstActiveVersion value that's higher than the minVersion for the feature. The + * value for firstActiveVersion should be 1 beyond the highest version that you intend to deprecate + * for that feature. Whenever the controller is elected or the features are finalized via the
[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r496321312 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); Review comment: Yea, you are right, I think this comment belongs to updateFeatures This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r462453975 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); Review comment: Note in the post-KIP-500 world, this feature could still work, but the request must be redirected to the controller inherently on the broker side, instead of sending it directly. So in the comment, we may try to phrase it to convey the principal is that `the request must be handled by the controller` instead of `the admin client must send this request to the controller`. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + +/** + * Applies specified updates to finalized features. This operation is not transactional so it + * may succeed for some features while fail for others. + * + * The API takes in a map of finalized feature name to {@link FeatureUpdate} that need to be Review comment: nit: s/name/names ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig, */ private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = { try { + val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers + if (config.isFeatureVersioningEnabled) { +def hasIncompatibleFeatures(broker: Broker): Boolean = { + val latestFinalizedFeatures = featureCache.get + if (latestFinalizedFeatures.isDefined) { +BrokerFeatures.hasIncompatibleFeatures(broker.features, latestFinalizedFeatures.get.features) + } else { +false + } +} +controllerContext.liveOrShuttingDownBrokers.foreach(broker => { + if (filteredBrokers.contains(broker.id) && hasIncompatibleFeatures(broker)) { Review comment: I see, what would happen to a currently live broker if it couldn't get any metadata update for a while, will it shut down itself? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either exp
[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r453842949 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the DescribeFeaturesResult containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + +/** + * Applies specified updates to finalized features. The API is atomic, meaning that if a single + * feature update in the request can't succeed on the controller, then none of the feature + * updates are carried out. This request is issued only to the controller since the API is + * only served by the controller. + * + * The API takes as input a set of FinalizedFeatureUpdate that need to be applied. Each such + * update specifies the finalized feature to be added or updated or deleted, along with the new + * max feature version level value. + * + * Downgrade of feature version level is not a regular operation/intent. It is only allowed + * in the controller if the feature update has the allowDowngrade flag set - setting this flag + * conveys user intent to attempt downgrade of a feature max version level. Note that despite + * the allowDowngrade flag being set, certain downgrades may be rejected by the controller if it + * is deemed impossible. + * Deletion of a finalized feature version is not a regular operation/intent. It is allowed + * only if the allowDowngrade flag is set in the feature update, and, if the max version level + * is set to a value less than 1. + * + * + * The following exceptions can be anticipated when calling {@code get()} on the futures + * obtained from the returned {@link UpdateFinalizedFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * If the authenticated user didn't have alter access to the cluster. + * {@link org.apache.kafka.common.errors.InvalidRequestException} + * If the request details are invalid. e.g., a non-existing finalized feature is attempted + * to be deleted or downgraded. + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the updates could finish. It cannot be guaranteed whether + * the updates succeeded or not. + * {@link org.apache.kafka.common.errors.FinalizedFeatureUpdateFailedException} + * If the updates could not be applied on the controller, despite the request being valid. + * This may be a temporary problem. + * + * + * This operation is supported by brokers with version 2.7.0 or higher. + + * @param featureUpdates the set of finalized feature updates + * @param options the options to use + * + * @return the UpdateFinalizedFeaturesResult containing the result Review comment: nit: get a ` {@link UpdateFinalizedFeaturesResult}` as well ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1214,70 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker, but it can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the DescribeFeaturesResult containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + +/** + * Applies specified updates to finalized features. The API is atomic, meaning that if a single + * feature update in the request can't succeed on the control