[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r467279790



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+  "about": "The list of updates to finalized features.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+"about": "The name of the finalized feature to be updated."},
+  {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+"about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+  {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
   The KIP wiki has AllowDowngrade at the topic level. Could we update that?

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the enti

[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-17 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r471627837



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+  "about": "The list of updates to finalized features.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+"about": "The name of the finalized feature to be updated."},
+  {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+"about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+  {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
   OK. There are a couple of places that this PR is inconsistent with the 
KIP.
   
   1. The KIP has 2 levels of arrays: []FeatureUpdateKey and []FeatureKey. This 
PR only has one array.
   2. The KIP has a timeoutMs field and this PR doesn't.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-17 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r471806378



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -977,14 +1179,30 @@ class KafkaController(val config: KafkaConfig,
 
   /**
* Send the leader information for selected partitions to selected brokers 
so that they can correctly respond to
-   * metadata requests
+   * metadata requests. Particularly, when feature versioning is enabled, we 
filter out brokers with incompatible
+   * features from receiving the metadata requests. This is because we do not 
want to activate incompatible brokers,
+   * as these may have harmful consequences to the cluster.

Review comment:
   My understanding of the race condition is that the controller finalizes 
a feature while there is a pending broker registration in the controller event 
queue. When the controller starts to process the new broker registration, it 
will realize that its supported feature is not compatible. Here, it's seems 
that we will still process this new broker registration and only avoid sending 
UpdatateMetadataRequest to it. I am not sure if this helps since we already 
acted on this incompatible broker registration and some damage may already be 
done. The same UpdatateMetadataRequest will still be sent to other brokers and 
its metadata will be available to the clients.
   
   An alternative way is to just skip the handling of new broker registration 
if it's detected as incompatible.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-28 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496093216



##
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##
@@ -55,8 +55,8 @@
   "about": "The maximum supported version for the feature." }
   ]
 },
-{"name": "FinalizedFeaturesEpoch", "type": "int32", "versions": "3+",
-  "tag": 1, "taggedVersions": "3+", "default": "-1",
+{"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",

Review comment:
   Space before "name".

##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 57,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": 
"6",

Review comment:
   This is not included in the KIP. Should we update the KIP?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {
+private final short minVersionLevel;
+
+private final short maxVersionLevel;
+
+/**
+ * Raises an exception unless the following condition is met:
+ * minVersionLevel >= 1 and maxVersionLevel >= 1 and maxVersionLevel >= 
minVersionLevel.
+ *
+ * @param minVersionLevel   The minimum version level value.
+ * @param maxVersionLevel   The maximum version level value.
+ *
+ * @throws IllegalArgumentException   Raised when the condition described 
above is not met.
+ */
+public FinalizedVersionRange(final short minVersionLevel, final short 
maxVersionLevel) {

Review comment:
   Since the user is not expected to instantiate this, should we make the 
constructor non-public?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result 

[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-30 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r497813798



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+private final Map finalizedFeatures;
+
+private final Optional finalizedFeaturesEpoch;
+
+private final Map supportedFeatures;
+
+public FeatureMetadata(final Map 
finalizedFeatures,

Review comment:
   I was looking at existing classes fro the return value. For example, 
CreateAclsResult deliberately makes the constructor non-public.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3109,6 +3109,36 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+
+def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): 
Unit = {
+  def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
+errors match {
+  case Left(topLevelError) =>
+UpdateFeaturesResponse.createWithErrors(
+  topLevelError,
+  new util.HashMap[String, ApiError](),

Review comment:
   Could we use Collections.emptyMap()?

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1910,204 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.

[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498420758



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   Thinking about this a bit more. It seems that the intention of 
firstActiveVersion is to avoid deploying a wrong version of the broker that 
causes the deprecation of a finalized feature version unexpectedly. However, 
the same mistake can happen with firstActiveVersion since the deprecation of a 
finalized feature version is based on firstActiveVersion. So, I am not sure if 
firstActiveVersion addresses a real problem.
   
   In general, we tend to deprecate a version very slowly in AK. So, if the 
mistake is to deploy a new release that actually deprecates a supported 
version. Old clients are likely all gone. So, moving finalized min version to 
supported min version may not cause a big problem. We can just document that 
people should make sure old versions are no longer used before deploying new 
releases.
   
   If the mistake is to deploy an old version of the broker whose 
maxSupportedVersion is < maxFinalizedVersion, we will fail the broker. So, this 
mistake can be prevented.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-01 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498512579



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##
@@ -17,9 +17,16 @@
 package org.apache.kafka.common.feature;
 
 import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.utils.Utils;
 
 /**
- * An extended {@link BaseVersionRange} representing the min/max versions for 
supported features.
+ * An extended {@link BaseVersionRange} representing the min, max and first 
active versions for a
+ * supported feature:
+ *  - minVersion: This is the minimum supported version for the feature.
+ *  - maxVersion: This the maximum supported version for the feature.
+ *  - firstActiveVersion: This is the first active version for the feature. 
Versions in the range

Review comment:
   @kowshik : I was thinking what if we relax the current check by just 
making sure that maxVersion of finalized is within the supported range. 
Basically in your example, if supported minVersion goes to 2, it's still 
allowed since it's less than maxVersion of finalized. However, if supported 
minVersion goes to 7, this fails the broker since it's more than maxVersion of 
finalized.
   
   Your concern for the relaxed check seems to be around deploying a wrong 
version of the broker by mistake. I am not sure if that's a big concern. If the 
wrong broker affects maxVersion of finalized, the broker won't start. If the 
wrong broker affects minVersion of finalized, if we deprecated slowly, it won't 
impact the existing clients.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498959506



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,147 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent.
+   *- If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *  and empty finalized features.
+   *- Otherwise, if a node already exists in enabled status then the 
controller will just
+   *  flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled.
+   * - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *

[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r498980025



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,147 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent.
+   *- If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *  and empty finalized features.
+   *- Otherwise, if a node already exists in enabled status then the 
controller will just
+   *  flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled.
+   * - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *

[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-02 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499036138



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesResult.java
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#describeFeatures(DescribeFeaturesOptions)} 
call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class DescribeFeaturesResult {
+
+private final KafkaFuture future;
+
+public DescribeFeaturesResult(KafkaFuture future) {

Review comment:
   Could we make the constructor non-public?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesResult.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.KafkaFuture;
+
+/**
+ * The result of the {@link Admin#updateFeatures(Map, UpdateFeaturesOptions)} 
call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+public class UpdateFeaturesResult {
+private final Map> futures;
+
+/**
+ * @param futures   a map from feature name to future, which can be used 
to check the status of
+ *  individual feature updates.
+ */
+public UpdateFeaturesResult(final Map> futures) {

Review comment:
   Could we make the constructor non-public?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499753420



##
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##
@@ -185,7 +185,7 @@ class BrokerEndPointTest {
   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
   "rack":"dc1",
-  "features": {"feature1": {"min_version": 1, "max_version": 2}, 
"feature2": {"min_version": 2, "max_version": 4}}
+  "features": {"feature1": {"min_version": 1, "first_active_version": 1, 
"max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, 
"max_version": 4}}

Review comment:
   Should we revert the changes here?

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absen