junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498959506



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -272,6 +281,147 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *    system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *    set to a higher value later. In this case, we want to start with no 
finalized features and
+   *    allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *    user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *    finalizing the features. This process ensures we do not enable all the 
possible features
+   *    immediately after an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent.
+   *        - If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *          and empty finalized features.
+   *        - Otherwise, if a node already exists in enabled status then the 
controller will just
+   *          flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled.
+   *         - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *           Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *           user finalize the features later.
+   *         - Otherwise, if a node already exists in enabled status then the 
controller will leave
+   *           the node umodified.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine there was an existing Kafka cluster with IBP config >= 
KAFKA_2_7_IV0, and the broker
+   *    binary has just been upgraded to a newer version (that supports IBP 
config KAFKA_2_7_IV0 and
+   *    higher). The controller will start up and find that a FeatureZNode is 
already present with
+   *    enabled status and existing finalized features. In such a case, the 
controller leaves the node
+   *    unmodified.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled,
+                                          
brokerFeatures.defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      if (!existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        val newVersion = updateFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled,

Review comment:
       This this case, existingFeatureZNode.features is expected to be empty? 
Could we log a warn if this is not the case and always set finalized to empty?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##########
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
       "we do not know whether all brokers in the cluster support a particular 
minVersion when the controller finalizes the minVersionLevel at a particular 
value." The controller knows the minSupportedVersion for all brokers, right? 
What if we do the following? When finalizing a feature, the controllers uses 
the highest minSupportedVersion across all brokers as finalizedMinVersion, as 
long as it's <= finalizedMaxVersion. On broker restart, we also advance 
finalizedMinVersion if the new broker's minSupportedVersion has advanced 
(assuming still <= finalizedMaxVersion). 

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -272,6 +281,147 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *    system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *    set to a higher value later. In this case, we want to start with no 
finalized features and
+   *    allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *    user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *    finalizing the features. This process ensures we do not enable all the 
possible features
+   *    immediately after an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent.
+   *        - If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *          and empty finalized features.
+   *        - Otherwise, if a node already exists in enabled status then the 
controller will just
+   *          flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled.
+   *         - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *           Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *           user finalize the features later.
+   *         - Otherwise, if a node already exists in enabled status then the 
controller will leave
+   *           the node umodified.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine there was an existing Kafka cluster with IBP config >= 
KAFKA_2_7_IV0, and the broker
+   *    binary has just been upgraded to a newer version (that supports IBP 
config KAFKA_2_7_IV0 and
+   *    higher). The controller will start up and find that a FeatureZNode is 
already present with
+   *    enabled status and existing finalized features. In such a case, the 
controller leaves the node
+   *    unmodified.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled,
+                                          
brokerFeatures.defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      if (!existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {

Review comment:
       This test is not enough. The issue is that when a controller fails over, 
it's possible that new brokers have joined the cluster during the failover. So, 
if existingFeatureZNode is enabled, it may not be reflecting the state in those 
newly joined brokers. So, it seems that we need to do the validation for every 
broker during controller failover in that case.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1656,6 +1840,204 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+    if (supportedVersionRange == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST,
+                         "Could not apply finalized feature update because the 
provided feature" +
+                         " is not supported."))
+    } else {
+      var newVersionRange: FinalizedVersionRange = null
+      try {
+        newVersionRange = new FinalizedVersionRange(supportedVersionRange.min, 
update.maxVersionLevel)
+      } catch {
+        case _: IllegalArgumentException => {
+          // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+          // outside of this catch clause.
+        }
+      }
+      if (newVersionRange == null) {
+        Right(new ApiError(Errors.INVALID_REQUEST,
+          "Could not apply finalized feature update because the provided" +
+          s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+          s" supported minVersion:${supportedVersionRange.min}."))
+      } else {
+        val newFinalizedFeature =
+          Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+        val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+          BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+        })
+        if (numIncompatibleBrokers == 0) {
+          Left(newVersionRange)
+        } else {
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Could not apply finalized feature update 
because" +
+                             " brokers were found to have incompatible 
versions for the feature."))
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange to be updated 
into ZK or error
+   *                               as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                    existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Can not delete non-existing finalized feature."))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 without setting the allowDowngrade flag 
to true in the request."))
+      } else {
+        existingVersionRange.map(existing =>
+          if (update.maxVersionLevel == existing.max) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not ${if (update.allowDowngrade) 
"downgrade" else "upgrade"}" +
+                               s" a finalized feature from existing 
maxVersionLevel:${existing.max}" +
+                               " to the same value."))
+          } else if (update.maxVersionLevel < existing.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature from 
existing" +
+                               s" maxVersionLevel:${existing.max} to provided" 
+
+                               s" maxVersionLevel:${update.maxVersionLevel} 
without setting the" +
+                               " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
existing.max) {
+            // Disallow a request that sets allowDowngrade flag without 
specifying a
+            // maxVersionLevel that's lower than the existing maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"When the allowDowngrade flag set in the 
request, the provided" +
+                               s" maxVersionLevel:${update.maxVersionLevel} 
can not be greater than" +
+                               s" existing maxVersionLevel:${existing.max}."))
+          } else if (update.maxVersionLevel < existing.min) {
+            // Disallow downgrade of a finalized feature below the existing 
finalized
+            // minVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature to 
maxVersionLevel:${update.maxVersionLevel}" +
+                               s" because it's lower than the existing 
minVersionLevel:${existing.min}."))
+          } else {
+            newVersionRangeOrError(update)
+          }
+        ).getOrElse(newVersionRangeOrError(update))
+      }
+    }
+  }
+
+  private def processFeatureUpdates(request: UpdateFeaturesRequest,
+                                    callback: UpdateFeaturesCallback): Unit = {
+    if (isActive) {
+      processFeatureUpdatesWithActiveController(request, callback)
+    } else {
+      callback(Left(new ApiError(Errors.NOT_CONTROLLER)))
+    }
+  }
+
+  private def processFeatureUpdatesWithActiveController(request: 
UpdateFeaturesRequest,
+                                                        callback: 
UpdateFeaturesCallback): Unit = {
+    val updates = request.data.featureUpdates
+    val existingFeatures = featureCache.get
+      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+      .getOrElse(Map[String, FinalizedVersionRange]())
+    // A map with key being feature name and value being FinalizedVersionRange.
+    // This contains the target features to be eventually written to 
FeatureZNode.
+    val targetFeatures = scala.collection.mutable.Map[String, 
FinalizedVersionRange]() ++ existingFeatures
+    // A map with key being feature name and value being error encountered 
when the FeatureUpdate
+    // was applied.
+    val errors = scala.collection.mutable.Map[String, ApiError]()
+
+    // Below we process each FeatureUpdate using the following logic:
+    //  - If a FeatureUpdate is found to be valid, then:
+    //    - The corresponding entry in errors map would be updated to contain 
Errors.NONE.
+    //    - If the FeatureUpdate is an add or update request, then the 
targetFeatures map is updated
+    //      to contain the new FinalizedVersionRange for the feature.
+    //    - Otherwise if the FeatureUpdate is a delete request, then the 
feature is removed from the
+    //      targetFeatures map.
+    //  - Otherwise if a FeatureUpdate is found to be invalid, then:
+    //    - The corresponding entry in errors map would be updated with the 
appropriate ApiError.
+    //    - The entry in targetFeatures map is left untouched.
+    updates.asScala.iterator.foreach { update =>
+      validateFeatureUpdate(update, existingFeatures.get(update.feature())) 
match {
+        case Left(newVersionRangeOrNone) =>
+          newVersionRangeOrNone match {
+            case Some(newVersionRange) => targetFeatures += (update.feature() 
-> newVersionRange)
+            case None => targetFeatures -= update.feature()
+          }
+          errors += (update.feature() -> new ApiError(Errors.NONE))
+        case Right(featureUpdateFailureReason) =>
+          errors += (update.feature() -> featureUpdateFailureReason)
+      }
+    }
+
+    // If the existing and target features are the same, then, we skip the 
update to the
+    // FeatureZNode as no changes to the node are required. Otherwise, we 
replace the contents
+    // of the FeatureZNode with the new features. This may result in partial 
or full modification
+    // of the existing finalized features in ZK.
+    try {
+      if (!existingFeatures.equals(targetFeatures)) {
+        val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
Features.finalizedFeatures(targetFeatures.asJava))
+        val newVersion = zkClient.updateFeatureZNode(newNode)

Review comment:
       Should we call updateFeatureZNode() so that we can get the logging?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to