junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r497813798



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Map<String, FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Long> finalizedFeaturesEpoch;
+
+    private final Map<String, SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(final Map<String, FinalizedVersionRange> 
finalizedFeatures,

Review comment:
       I was looking at existing classes fro the return value. For example, 
CreateAclsResult deliberately makes the constructor non-public.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3109,6 +3109,36 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+    val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+
+    def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): 
Unit = {
+      def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
+        errors match {
+          case Left(topLevelError) =>
+            UpdateFeaturesResponse.createWithErrors(
+              topLevelError,
+              new util.HashMap[String, ApiError](),

Review comment:
       Could we use Collections.emptyMap()?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1656,6 +1910,204 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+    if (supportedVersionRange == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST,
+                         "Could not apply finalized feature update because the 
provided feature" +
+                         " is not supported."))
+    } else {
+      var newVersionRange: FinalizedVersionRange = null
+      try {
+        newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+      } catch {
+        case _: IllegalArgumentException => {
+          // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+          // outside of this catch clause.
+        }
+      }
+      if (newVersionRange == null) {
+        Right(new ApiError(Errors.INVALID_REQUEST,
+          "Could not apply finalized feature update because the provided" +
+          s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+          s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+      } else {
+        val newFinalizedFeature =
+          Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+        val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+          BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+        })
+        if (numIncompatibleBrokers == 0) {
+          Left(newVersionRange)
+        } else {
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Could not apply finalized feature update 
because" +
+                             " brokers were found to have incompatible 
versions for the feature."))
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange to be updated 
into ZK or error
+   *                               as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                    existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Can not delete non-existing finalized feature."))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 without setting the allowDowngrade flag 
to true in the request."))
+      } else {
+        existingVersionRange.map(existing =>
+          if (update.maxVersionLevel == existing.max) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not ${if (update.allowDowngrade) 
"downgrade" else "upgrade"}" +
+                               s" a finalized feature from existing 
maxVersionLevel:${existing.max}" +
+                               " to the same value."))
+          } else if (update.maxVersionLevel < existing.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature from 
existing" +
+                               s" maxVersionLevel:${existing.max} to provided" 
+
+                               s" maxVersionLevel:${update.maxVersionLevel} 
without setting the" +
+                               " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
existing.max) {
+            // Disallow a request that sets allowDowngrade flag without 
specifying a
+            // maxVersionLevel that's lower than the existing maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"When the allowDowngrade flag set in the 
request, the provided" +
+                               s" maxVersionLevel:${update.maxVersionLevel} 
can not be greater than" +
+                               s" existing maxVersionLevel:${existing.max}."))
+          } else if (update.maxVersionLevel < existing.min) {
+            // Disallow downgrade of a finalized feature below the existing 
finalized
+            // minVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature to 
maxVersionLevel:${update.maxVersionLevel}" +
+                               s" because it's lower than the existing 
minVersionLevel:${existing.min}."))
+          } else {
+            newVersionRangeOrError(update)
+          }
+        ).getOrElse(newVersionRangeOrError(update))
+      }
+    }
+  }
+
+  private def processFeatureUpdates(request: UpdateFeaturesRequest,
+                                    callback: UpdateFeaturesCallback): Unit = {
+    if (isActive) {
+      processFeatureUpdatesWithActiveController(request, callback)
+    } else {
+      callback(Left(new ApiError(Errors.NOT_CONTROLLER)))
+    }
+  }
+
+  private def processFeatureUpdatesWithActiveController(request: 
UpdateFeaturesRequest,
+                                                        callback: 
UpdateFeaturesCallback): Unit = {
+    val updates = request.data.featureUpdates
+    val existingFeatures = featureCache.get
+      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+      .getOrElse(Map[String, FinalizedVersionRange]())
+    // A map with key being feature name and value being FinalizedVersionRange.
+    // This contains the target features to be eventually written to 
FeatureZNode.
+    val targetFeatures = scala.collection.mutable.Map[String, 
FinalizedVersionRange]() ++ existingFeatures
+    // A map with key being feature name and value being error encountered 
when the FeatureUpdate
+    // was applied.
+    val errors = scala.collection.mutable.Map[String, ApiError]()
+
+    // Below we process each FeatureUpdate using the following logic:
+    //  - If a FeatureUpdate is found to be valid, then:
+    //    - The corresponding entry in errors map would be updated to contain 
Errors.NONE.
+    //    - If the FeatureUpdate is an add or update request, then the 
targetFeatures map is updated
+    //      to contain the new FinalizedVersionRange for the feature.
+    //    - Otherwise if the FeatureUpdate is a delete request, then the 
feature is removed from the
+    //      targetFeatures map.
+    //  - Otherwise if a FeatureUpdate is found to be invalid, then:
+    //    - The corresponding entry in errors map would be updated with the 
appropriate ApiError.
+    //    - The entry in targetFeatures map is left untouched.
+    updates.asScala.iterator.foreach { update =>
+      validateFeatureUpdate(update, existingFeatures.get(update.feature())) 
match {
+        case Left(newVersionRangeOrNone) =>
+          newVersionRangeOrNone match {
+            case Some(newVersionRange) => targetFeatures += (update.feature() 
-> newVersionRange)
+            case None => targetFeatures -= update.feature()
+          }
+          errors += (update.feature() -> new ApiError(Errors.NONE))
+        case Right(featureUpdateFailureReason) =>
+          errors += (update.feature() -> featureUpdateFailureReason)
+      }
+    }
+
+    // If the existing and target features are the same, then, we skip the 
update to the
+    // FeatureZNode as no changes to the node are required. Otherwise, we 
replace the contents
+    // of the FeatureZNode with the new features. This may result in partial 
or full modification
+    // of the existing finalized features in ZK.
+    try {
+      if (!existingFeatures.equals(targetFeatures)) {
+        val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
Features.finalizedFeatures(targetFeatures.asJava))
+        val newVersion = zkClient.updateFeatureZNode(newNode)
+        featureCache.waitUntilEpochOrThrow(newVersion, 
request.data().timeoutMs().min(config.zkConnectionTimeoutMs))

Review comment:
       Hmm, why do we need to take the min? If the ZK data is propagated 
quickly, waitUntilEpochOrThrow() will just return early.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1656,6 +1910,204 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+    if (supportedVersionRange == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST,
+                         "Could not apply finalized feature update because the 
provided feature" +
+                         " is not supported."))
+    } else {
+      var newVersionRange: FinalizedVersionRange = null
+      try {
+        newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+      } catch {
+        case _: IllegalArgumentException => {
+          // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+          // outside of this catch clause.
+        }
+      }
+      if (newVersionRange == null) {
+        Right(new ApiError(Errors.INVALID_REQUEST,
+          "Could not apply finalized feature update because the provided" +
+          s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+          s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+      } else {
+        val newFinalizedFeature =
+          Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+        val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+          BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+        })
+        if (numIncompatibleBrokers == 0) {
+          Left(newVersionRange)
+        } else {
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Could not apply finalized feature update 
because" +
+                             " brokers were found to have incompatible 
versions for the feature."))
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange to be updated 
into ZK or error
+   *                               as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                    existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Can not delete non-existing finalized feature."))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 without setting the allowDowngrade flag 
to true in the request."))
+      } else {
+        existingVersionRange.map(existing =>
+          if (update.maxVersionLevel == existing.max) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not ${if (update.allowDowngrade) 
"downgrade" else "upgrade"}" +
+                               s" a finalized feature from existing 
maxVersionLevel:${existing.max}" +
+                               " to the same value."))
+          } else if (update.maxVersionLevel < existing.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature from 
existing" +
+                               s" maxVersionLevel:${existing.max} to provided" 
+
+                               s" maxVersionLevel:${update.maxVersionLevel} 
without setting the" +
+                               " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
existing.max) {
+            // Disallow a request that sets allowDowngrade flag without 
specifying a
+            // maxVersionLevel that's lower than the existing maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"When the allowDowngrade flag set in the 
request, the provided" +
+                               s" maxVersionLevel:${update.maxVersionLevel} 
can not be greater than" +
+                               s" existing maxVersionLevel:${existing.max}."))
+          } else if (update.maxVersionLevel < existing.min) {
+            // Disallow downgrade of a finalized feature below the existing 
finalized
+            // minVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature to 
maxVersionLevel:${update.maxVersionLevel}" +
+                               s" because it's lower than the existing 
minVersionLevel:${existing.min}."))
+          } else {
+            newVersionRangeOrError(update)
+          }
+        ).getOrElse(newVersionRangeOrError(update))
+      }
+    }
+  }
+
+  private def processFeatureUpdates(request: UpdateFeaturesRequest,
+                                    callback: UpdateFeaturesCallback): Unit = {
+    if (isActive) {
+      processFeatureUpdatesWithActiveController(request, callback)
+    } else {
+      callback(Left(new ApiError(Errors.NOT_CONTROLLER)))
+    }
+  }
+
+  private def processFeatureUpdatesWithActiveController(request: 
UpdateFeaturesRequest,
+                                                        callback: 
UpdateFeaturesCallback): Unit = {
+    val updates = request.data.featureUpdates
+    val existingFeatures = featureCache.get
+      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+      .getOrElse(Map[String, FinalizedVersionRange]())
+    // A map with key being feature name and value being FinalizedVersionRange.
+    // This contains the target features to be eventually written to 
FeatureZNode.
+    val targetFeatures = scala.collection.mutable.Map[String, 
FinalizedVersionRange]() ++ existingFeatures
+    // A map with key being feature name and value being error encountered 
when the FeatureUpdate
+    // was applied.
+    val errors = scala.collection.mutable.Map[String, ApiError]()
+
+    // Below we process each FeatureUpdate using the following logic:
+    //  - If a FeatureUpdate is found to be valid, then:
+    //    - The corresponding entry in errors map would be updated to contain 
Errors.NONE.
+    //    - If the FeatureUpdate is an add or update request, then the 
targetFeatures map is updated
+    //      to contain the new FinalizedVersionRange for the feature.
+    //    - Otherwise if the FeatureUpdate is a delete request, then the 
feature is removed from the
+    //      targetFeatures map.
+    //  - Otherwise if a FeatureUpdate is found to be invalid, then:
+    //    - The corresponding entry in errors map would be updated with the 
appropriate ApiError.
+    //    - The entry in targetFeatures map is left untouched.
+    updates.asScala.iterator.foreach { update =>
+      validateFeatureUpdate(update, existingFeatures.get(update.feature())) 
match {
+        case Left(newVersionRangeOrNone) =>
+          newVersionRangeOrNone match {
+            case Some(newVersionRange) => targetFeatures += (update.feature() 
-> newVersionRange)
+            case None => targetFeatures -= update.feature()
+          }
+          errors += (update.feature() -> new ApiError(Errors.NONE))
+        case Right(featureUpdateFailureReason) =>
+          errors += (update.feature() -> featureUpdateFailureReason)
+      }
+    }
+
+    // If the existing and target features are the same, then, we skip the 
update to the
+    // FeatureZNode as no changes to the node are required. Otherwise, we 
replace the contents
+    // of the FeatureZNode with the new features. This may result in partial 
or full modification
+    // of the existing finalized features in ZK.
+    try {
+      if (!existingFeatures.equals(targetFeatures)) {
+        val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
Features.finalizedFeatures(targetFeatures.asJava))
+        val newVersion = zkClient.updateFeatureZNode(newNode)
+        featureCache.waitUntilEpochOrThrow(newVersion, 
request.data().timeoutMs().min(config.zkConnectionTimeoutMs))
+      }
+    } catch {
+      // For all features that correspond to valid FeatureUpdate (i.e. error 
is Errors.NONE),
+      // we set the error as Errors.FEATURE_UPDATE_FAILED since the 
FeatureZNode update has failed
+      // for these. For the rest, the existing error is left untouched.
+      case e: Exception =>
+        warn(s"Processing of feature updates: $request failed due to error: 
$e")
+        errors.foreach { case (feature, apiError) =>
+          if (apiError.error() == Errors.NONE) {
+            errors(feature) = new ApiError(Errors.FEATURE_UPDATE_FAILED)

Review comment:
       It's useful to return an error message too.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -219,6 +226,8 @@ class KafkaController(val config: KafkaConfig,
    * This ensures another controller election will be triggered and there will 
always be an actively serving controller
    */
   private def onControllerFailover(): Unit = {
+    maybeSetupFeatureVersioning()

Review comment:
       This can throw an exception due to feature mismatch. Currently, this 
forces the controller to move but keeps the broker alive. Should we force the 
broker to exit in this case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to