kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463915406



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+    // NOTE: Below we set the finalized min version level to be the default 
minimum version
+    // level. If the finalized feature already exists, then, this can cause 
deprecation of all
+    // version levels in the closed range:
+    // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+    val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+    val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+    val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+      val singleFinalizedFeature =
+        Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+      BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+    })
+    if (numIncompatibleBrokers == 0) {
+      Left(newVersionRange)
+    } else {
+      Right(
+        new ApiError(Errors.INVALID_REQUEST,
+                     s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+                     " brokers were found to have incompatible features."))
+    }
+  }
+
+  /**
+   * Validate and process a finalized feature update.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update   the feature update to be processed.
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+    val existingFeatures = featureCache.get
+      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+      .getOrElse(Map[String, FinalizedVersionRange]())
+
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      val cacheEntry = existingFeatures.get(update.feature).orNull
+
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (cacheEntry == null) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 for feature: '${update.feature}' without 
setting the" +
+                           " allowDowngrade flag to true in the request."))
+      } else {
+        if (cacheEntry == null) {
+          newVersionRangeOrError(update)
+        } else {
+          if (update.maxVersionLevel == cacheEntry.max()) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not ${if (update.allowDowngrade) 
"downgrade" else "upgrade"}" +
+                               s" a finalized feature: '${update.feature}' 
from existing" +
+                               s" maxVersionLevel:${cacheEntry.max} to the 
same value."))
+          } else if (update.maxVersionLevel < cacheEntry.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature: 
'${update.feature}' from" +
+                               s" existing maxVersionLevel:${cacheEntry.max} 
to provided" +
+                               s" maxVersionLevel:${update.maxVersionLevel} 
without setting the" +
+                               " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
cacheEntry.max) {

Review comment:
       Updated the doc. Let's keep the check, if it happens then it's a user 
error. Especially because this can not happen if the user is using the tooling 
that we are going to provide in AK.

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+      "about": "The list of updates to finalized features.", "fields": [
+      {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+        "about": "The name of the finalized feature to be updated."},
+      {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+        "about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+      {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
+        "about": "When set to true, the finalized feature version level is 
allowed to be downgraded/deleted."}

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to