[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496901588



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {

Review comment:
   Do we want to have a different name from 
`org.apache.kafka.common.feature.FinalizedVersionRange`, such as 
`FinalizedVersionLevels`? Same case for `SupportedVersionRange`, personally I 
feel the same class name makes the navigation harder.

##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * ===
+ *
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, 
deprecated finalized
+ * feature versions are no longer advertised to the client, but they can still 
be used by existing
+ * connections. The way it works is that the feature versioning system (via 
the controller) will
+ * automatically persist the new minVersionLevel for the feature in ZK to 
propagate the deprecation
+ * of certain versions. After this happens, any external client that queries 
the Broker to learn the
+ * feature versions will at some point start to see the new value for the 
finalized minVersionLevel
+ * for the feature. The external clients are expected to stop using the 
deprecated versions at least
+ * by the time that they learn about it.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. Whenever the controller is elected or the features are 
finalized via the

[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-28 Thread GitBox


abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496321312



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
   Yea, you are right, I think this comment belongs to updateFeatures





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-30 Thread GitBox


abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r462453975



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
   Note in the post-KIP-500 world, this feature could still work, but the 
request must be redirected to the controller inherently on the broker side, 
instead of sending it directly. So in the comment, we may try to phrase it to 
convey the principal is that `the request must be handled by the controller` 
instead of `the admin client must send this request to the controller`. 

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,70 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. This operation is not 
transactional so it
+ * may succeed for some features while fail for others.
+ * 
+ * The API takes in a map of finalized feature name to {@link 
FeatureUpdate} that need to be

Review comment:
   nit: s/name/names

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig,
*/
   private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], 
partitions: Set[TopicPartition]): Unit = {
 try {
+  val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers
+  if (config.isFeatureVersioningEnabled) {
+def hasIncompatibleFeatures(broker: Broker): Boolean = {
+  val latestFinalizedFeatures = featureCache.get
+  if (latestFinalizedFeatures.isDefined) {
+BrokerFeatures.hasIncompatibleFeatures(broker.features, 
latestFinalizedFeatures.get.features)
+  } else {
+false
+  }
+}
+controllerContext.liveOrShuttingDownBrokers.foreach(broker => {
+  if (filteredBrokers.contains(broker.id) && 
hasIncompatibleFeatures(broker)) {

Review comment:
   I see, what would happen to a currently live broker if it couldn't get 
any metadata update for a while, will it shut down itself?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either exp

[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-19 Thread GitBox


abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r453842949



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker, but it can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the DescribeFeaturesResult containing the result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. The API is atomic, 
meaning that if a single
+ * feature update in the request can't succeed on the controller, then 
none of the feature
+ * updates are carried out. This request is issued only to the controller 
since the API is
+ * only served by the controller.
+ * 
+ * The API takes as input a set of FinalizedFeatureUpdate that need to be 
applied. Each such
+ * update specifies the finalized feature to be added or updated or 
deleted, along with the new
+ * max feature version level value.
+ * 
+ * Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+ * in the controller if the feature update has the allowDowngrade flag set 
- setting this flag
+ * conveys user intent to attempt downgrade of a feature max version 
level. Note that despite
+ * the allowDowngrade flag being set, certain downgrades may be rejected 
by the controller if it
+ * is deemed impossible.
+ * Deletion of a finalized feature version is not a regular 
operation/intent. It is allowed
+ * only if the allowDowngrade flag is set in the feature update, and, if 
the max version level
+ * is set to a value less than 1.
+ * 
+ *
+ * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+ * obtained from the returned {@link UpdateFinalizedFeaturesResult}:
+ * 
+ *   {@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ *   If the authenticated user didn't have alter access to the 
cluster.
+ *   {@link org.apache.kafka.common.errors.InvalidRequestException}
+ *   If the request details are invalid. e.g., a non-existing finalized 
feature is attempted
+ *   to be deleted or downgraded.
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the updates could finish. It cannot 
be guaranteed whether
+ *   the updates succeeded or not.
+ *   {@link 
org.apache.kafka.common.errors.FinalizedFeatureUpdateFailedException}
+ *   If the updates could not be applied on the controller, despite the 
request being valid.
+ *   This may be a temporary problem.
+ * 
+ * 
+ * This operation is supported by brokers with version 2.7.0 or higher.
+
+ * @param featureUpdates   the set of finalized feature updates
+ * @param options  the options to use
+ *
+ * @return the UpdateFinalizedFeaturesResult containing 
the result

Review comment:
   nit: get a ` {@link UpdateFinalizedFeaturesResult}` as well

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker, but it can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the DescribeFeaturesResult containing the result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. The API is atomic, 
meaning that if a single
+ * feature update in the request can't succeed on the control