kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463935718



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1844,185 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+    // NOTE: Below we set the finalized min version level to be the default 
minimum version
+    // level. If the finalized feature already exists, then, this can cause 
deprecation of all
+    // version levels in the closed range:
+    // [existingVersionRange.min(), defaultMinVersionLevel - 1].
+    val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+    val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+    val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+      val singleFinalizedFeature =
+        Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+      BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+    })
+    if (numIncompatibleBrokers == 0) {
+      Left(newVersionRange)
+    } else {
+      Right(
+        new ApiError(Errors.INVALID_REQUEST,
+                     s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+                     " brokers were found to have incompatible features."))
+    }
+  }
+
+  /**
+   * Validate and process a finalized feature update on an existing 
FinalizedVersionRange for the
+   * feature.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange or error, as 
described above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {

Review comment:
       It is for returning the special error message that we handle it here 
explicitly: `...less than 1 for feature...`.
   A value < 1 is indicative of a deletion request (not purely a downgrade 
request).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to