[GitHub] [kafka] dajac commented on a change in pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)

2020-09-29 Thread GitBox


dajac commented on a change in pull request #9344:
URL: https://github.com/apache/kafka/pull/9344#discussion_r496467056



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1554,6 +1575,11 @@ private ConfigEntry configEntry(CreatableTopicConfigs 
config) {
 
 @Override
 void handleFailure(Throwable throwable) {
+// If there were any topics retries due to a quota exceeded 
exception, we propagate
+// the initial error back to the caller.
+completeQuotaExceededException(futures, 
quotaExceededExceptions,

Review comment:
   Definitely. I meant to do it for `TimeoutException` only but I have 
forgotten it while implementing it :(. Let me correct this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)

2020-09-29 Thread GitBox


dajac commented on a change in pull request #9344:
URL: https://github.com/apache/kafka/pull/9344#discussion_r496473270



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -733,7 +733,9 @@ public void 
testCreateTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO
 time.sleep(defaultApiTimeout + 1);
 
 assertNull(result.values().get("topic1").get());
-TestUtils.assertFutureThrows(result.values().get("topic2"), 
TimeoutException.class);
+ThrottlingQuotaExceededException e = 
TestUtils.assertFutureThrows(result.values().get("topic2"),
+ThrottlingQuotaExceededException.class);
+assertEquals(0, e.throttleTimeMs());

Review comment:
   Yeah, I do agree but this could happen even if this should be rare. This 
test case is a bit stretched to verify that throttle time does not go below 
zero.
   
   The reasoning of doing this is that a client could be throttled for longer 
than `default.api.timeout.ms`. When this happens, I believe that we should 
return an adjusted throttle time such that the client does not have to re-wait 
for the time that it has already waited for.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496509097



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 57,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": 
"6",

Review comment:
   Done. Updated the KIP.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions {

Review comment:
   Done. Updated the KIP.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496511604



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {
+private final short minVersionLevel;
+
+private final short maxVersionLevel;
+
+/**
+ * Raises an exception unless the following condition is met:
+ * minVersionLevel >= 1 and maxVersionLevel >= 1 and maxVersionLevel >= 
minVersionLevel.
+ *
+ * @param minVersionLevel   The minimum version level value.
+ * @param maxVersionLevel   The maximum version level value.
+ *
+ * @throws IllegalArgumentException   Raised when the condition described 
above is not met.
+ */
+public FinalizedVersionRange(final short minVersionLevel, final short 
maxVersionLevel) {

Review comment:
   It is instantiated from `kafka.server.UpdateFeaturesTest`, so have to 
keep the c'tor public.
   Also this class is removed now, and we are just using the `VersionRange` 
class.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496511864



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+private final Map finalizedFeatures;
+
+private final Optional finalizedFeaturesEpoch;
+
+private final Map supportedFeatures;
+
+public FeatureMetadata(final Map 
finalizedFeatures,

Review comment:
   It is instantiated from `kafka.server.UpdateFeaturesTest`, so have to 
keep the c'tor public.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496506413



##
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##
@@ -55,8 +55,8 @@
   "about": "The maximum supported version for the feature." }
   ]
 },
-{"name": "FinalizedFeaturesEpoch", "type": "int32", "versions": "3+",
-  "tag": 1, "taggedVersions": "3+", "default": "-1",
+{"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496524072



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496523894



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#updateFeatures(Map, UpdateFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class UpdateFeaturesOptions extends 
AbstractOptions {

Review comment:
   Yes, it is already added. The base class: `AbstractOptions` contains a 
`timeoutMs` attribute.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496523315



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {

Review comment:
   I considered this, however if we plan to expose `firstActiveVersion` to 
the client, then, it is better to have 2 separate classes like we do now. This 
is because `firstActiveVersion` will become an attribute only in 
`SupportedVersionRange` class. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496523894



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#updateFeatures(Map, UpdateFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class UpdateFeaturesOptions extends 
AbstractOptions {

Review comment:
   Yes, it is already added. The base class: `AbstractOptions` contains a 
`timeoutMs` attribute and the value is set in the `UpdateFeaturesRequest`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496524793



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:

Review comment:
   Yes, correct. I have updated the doc mentioning the same.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496525949



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, the 
feature versioning
+ * system (via the controller) will automatically persist the new 
minVersionLevel for the feature in
+ * Zk to propagate the deprecation of certain versions. After this happens, 
any external client that
+ * queries the Broker to learn the feature versions will at some point start 
to see the new value
+ * for the finalized minVersionLevel for the feature. This makes the version 
deprecation permanent.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. When features are finalized via the 
ApiKeys.UPDATE_FEATURES api, the feature
+ * version levels in the closed range: [minVersion, firstActiveVersion - 1] 
are automatically
+ * deprecated in ZK by the controller logic.
+ * Example:
+ * - Let us assume the existing finalized feature in ZK:
+ *   {
+ *  "feature_1" -> FinalizedVersionRange(minVersionLevel=1, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to deprecate feature version levels: [1, 2].
+ *   Then, in the supportedFeatures map you should supply the following:
+ *   supportedFeatures = {
+ * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, 
maxVersion=5)
+ *   }
+ * - If you do NOT want to deprecate a version level for a feature, then in 
the supportedFeatures
+ *   map you should supply the firstActiveVersion to be the same as the 
minVersion supplied for that
+ *   feature.
+ *   Example:
+ *   supportedFeatures = {
+ * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, 
maxVersion=5)
+ *   }
+ *   This indicates no intent to deprecate any version levels for the feature.
+ *
+ * STEP 2:
+ * After the first step is over, you may (at some point) want to permanently 
remove the code/logic
+ * for the functionality offered by the deprecated feature versions. This is 
the second step. Here a
+ * subsequent major Kafka release is made with another Broker code change that 
removes the code for
+ * the functionality offered by the deprecated feature versions. This would 
completely drop support
+ * for the deprecated versions. Such a code change needs to be supplemented by 
supplying a
+ * suitable higher minVersion value for the feature in the supportedFeatures 
map.
+ * Example:
+ * - In the example abov

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496526254



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496528169



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] mimaison merged pull request #9325: KAFKA-8360: Docs do not mention RequestQueueSize JMX metric

2020-09-29 Thread GitBox


mimaison merged pull request #9325:
URL: https://github.com/apache/kafka/pull/9325


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496530810



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (existingVersionRange.isEmpty) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Can not delete non-existing finalized feature."))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496531037



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+if (supportedVersionRange == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update because the 
provided feature" +
+ " is not supported."))
+} else {
+  var newVersionRange: FinalizedVersionRange = null
+  try {
+newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+  } catch {
+case _: IllegalArgumentException => {
+  // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+  // outside of this catch clause.
+}
+  }
+  if (newVersionRange == null) {
+Right(new ApiError(Errors.INVALID_REQUEST,
+  "Could not apply finalized feature update because the provided" +
+  s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+  s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+  } else {
+val newFinalizedFeature =
+  Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ "Could not apply finalized feature update 
because" +
+ " brokers were found to have incompatible 
versions for the feature."))
+}
+  }
+}
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *   FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return   the new FinalizedVersionRange to be updated 
into ZK or error
+   *   as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9325: KAFKA-8360: Docs do not mention RequestQueueSize JMX metric

2020-09-29 Thread GitBox


mimaison commented on pull request #9325:
URL: https://github.com/apache/kafka/pull/9325#issuecomment-700540446


   I'm always happy to merge improvements to the docs. Thanks for the 
contribution @ankit-kumar-25 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496534914



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -112,8 +112,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 brokerTopicStats: BrokerTopicStats,
 val clusterId: String,
 time: Time,
-val tokenManager: DelegationTokenManager)
-  extends ApiRequestHandler with Logging {
+val tokenManager: DelegationTokenManager,
+val brokerFeatures: BrokerFeatures,
+val featureCache: FinalizedFeatureCache) extends 
ApiRequestHandler with Logging {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496535646



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##
@@ -43,7 +43,7 @@
  */
 public class ApiVersionsResponse extends AbstractResponse {
 
-public static final int UNKNOWN_FINALIZED_FEATURES_EPOCH = -1;
+public static final long UNKNOWN_FINALIZED_FEATURES_EPOCH = -1;

Review comment:
   Done.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3109,6 +3110,37 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+
+def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): 
Unit = {
+  def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
+errors match {
+  case Left(topLevelError) => {
+val featureUpdateNoErrors = updateFeaturesRequest
+  .data().featureUpdates().asScala

Review comment:
   Done. Great point.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496538616



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FeatureUpdate.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;

Review comment:
   Done. I have now moved it to the package: 
`org.apache.kafka.clients.admin`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496543685



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496543685



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496543685



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *  

[GitHub] [kafka] dajac commented on pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)

2020-09-29 Thread GitBox


dajac commented on pull request #9344:
URL: https://github.com/apache/kafka/pull/9344#issuecomment-700557270


   @rajinisivaram Thanks for your comments. I have updated the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496550557



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##
@@ -40,14 +40,16 @@ public static FinalizedVersionRange fromMap(Map versionRangeMap)
 
 /**
  * Checks if the [min, max] version level range of this object does *NOT* 
fall within the
- * [min, max] version range of the provided SupportedVersionRange 
parameter.
+ * [min, first_active_version, max] range of the provided 
SupportedVersionRange parameter.

Review comment:
   We need to keep the existing validation. Here is a case where 
`minVersionLevel < firstActiveVersion` is true, but still there are no 
incompatibilities:
   ```
   SupportedVersionRange={minVersion=1, firstActiveVersion=4, maxVersion=7}
   FinalizedVersionRange={minVersionLevel=2, maxVersionLevel=6}
   ```
   
   For example, the above can happen during step 1 of feature verison level 
deprecation. Imagine a feature exists in ZK already and is finalized at `[2, 
6]`. Then imagine a new Kafka release is deployed that raises 
`firstActiveVersion` for the supported feature from 1 -> 4 (in order to 
deprecate versions: 1,2,3). In such a case, during Kafka server startup (where 
we check for feature incompatibilities), we would run into the comparison cited 
above between the new `SupportedVersionRange` and existing 
`FinalizedVersionRange`. But it is not considered to be a case of 
incompatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496550557



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##
@@ -40,14 +40,16 @@ public static FinalizedVersionRange fromMap(Map versionRangeMap)
 
 /**
  * Checks if the [min, max] version level range of this object does *NOT* 
fall within the
- * [min, max] version range of the provided SupportedVersionRange 
parameter.
+ * [min, first_active_version, max] range of the provided 
SupportedVersionRange parameter.

Review comment:
   We need to keep the existing validation. Here is a case where 
`minVersionLevel < firstActiveVersion` is true, but still there are no 
incompatibilities:
   ```
   SupportedVersionRange={minVersion=1, firstActiveVersion=4, maxVersion=7}
   FinalizedVersionRange={minVersionLevel=2, maxVersionLevel=6}
   ```
   
   For example, the above can happen during step 1 of feature verison level 
deprecation. Imagine a feature exists in ZK already and is finalized at 
`{minVersionLevel=2, maxVersionLevel=6}`. Then imagine a new Kafka release is 
deployed that raises `firstActiveVersion` for the supported feature from 1 -> 4 
(in order to deprecate versions: 1,2,3). In such a case, during Kafka server 
startup (where we check for feature incompatibilities), we would run into the 
comparison cited above between the new `SupportedVersionRange` and existing 
`FinalizedVersionRange`. But it is not considered to be a case of 
incompatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496550557



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##
@@ -40,14 +40,16 @@ public static FinalizedVersionRange fromMap(Map versionRangeMap)
 
 /**
  * Checks if the [min, max] version level range of this object does *NOT* 
fall within the
- * [min, max] version range of the provided SupportedVersionRange 
parameter.
+ * [min, first_active_version, max] range of the provided 
SupportedVersionRange parameter.

Review comment:
   We need to keep the existing validation. Here is a case where 
`minVersionLevel < firstActiveVersion` is true, but still there are no 
incompatibilities:
   ```
   SupportedVersionRange={minVersion=1, firstActiveVersion=4, maxVersion=7}
   FinalizedVersionRange={minVersionLevel=2, maxVersionLevel=6}
   ```
   
   For example, the above can happen during step 1 of feature verison level 
deprecation. Imagine the following:
* A supported feature exists with `SupportedVersionRange={minVersion=1, 
firstActiveVersion=4, maxVersion=7}`
* The above feature is finalized at `{minVersionLevel=2, 
maxVersionLevel=6}` in ZK already.
   
   Then imagine a new Kafka release is deployed that raises 
`firstActiveVersion` for the supported feature from 1 -> 4 (in order to 
deprecate versions: 1,2,3). In such a case, during Kafka server startup (where 
we check for feature incompatibilities), we would run into the comparison cited 
above between the new `SupportedVersionRange` and existing 
`FinalizedVersionRange`. But it is not considered to be a case of 
incompatibility.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-700585584


   @junrao Thanks a lot for the review! I've addressed the comments in the 
recent commit: 06d8b47131f168db88e4f7d5bda3dd025ba9a2a2.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496509097



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 57,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": 
"6",

Review comment:
   Done. Updated the KIP. Please refer to 
[this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-UpdateFeaturesRequestschema)
 section.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496509282



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions {

Review comment:
   Done. Updated the KIP. Please refer to 
[this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-AdminAPIchanges)
 section.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496511604



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {
+private final short minVersionLevel;
+
+private final short maxVersionLevel;
+
+/**
+ * Raises an exception unless the following condition is met:
+ * minVersionLevel >= 1 and maxVersionLevel >= 1 and maxVersionLevel >= 
minVersionLevel.
+ *
+ * @param minVersionLevel   The minimum version level value.
+ * @param maxVersionLevel   The maximum version level value.
+ *
+ * @throws IllegalArgumentException   Raised when the condition described 
above is not met.
+ */
+public FinalizedVersionRange(final short minVersionLevel, final short 
maxVersionLevel) {

Review comment:
   It is instantiated from `kafka.server.UpdateFeaturesTest`, so have to 
keep the c'tor public.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599

2020-09-29 Thread GitBox


rajinisivaram commented on a change in pull request #9344:
URL: https://github.com/apache/kafka/pull/9344#discussion_r496579003



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1554,6 +1575,13 @@ private ConfigEntry configEntry(CreatableTopicConfigs 
config) {
 
 @Override
 void handleFailure(Throwable throwable) {
+// If there were any topics retries due to a quota exceeded 
exception, we propagate
+// the initial error back to the caller if the request timed 
out.
+if (options.shouldRetryOnQuotaViolation() && throwable 
instanceof TimeoutException) {
+completeQuotaExceededException(futures, 
quotaExceededExceptions,

Review comment:
   Perhaps we could make this `maybeCompleteQuotaExceededException` and 
pass in the throwable, so that we can do the check for TimeoutException in that 
helper method rather than in every `handleFailure`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik edited a comment on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-700585584


   @junrao Thanks a lot for the review! I've addressed the comments in the 
recent commit: 06d8b47131f168db88e4f7d5bda3dd025ba9a2a2. I've provided a 
response to all of your comments. There are few I couldn't address, and 1-2 
comments I'll address in the near future (needs a little discussion).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)

2020-09-29 Thread GitBox


dajac commented on a change in pull request #9344:
URL: https://github.com/apache/kafka/pull/9344#discussion_r496645598



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1554,6 +1575,13 @@ private ConfigEntry configEntry(CreatableTopicConfigs 
config) {
 
 @Override
 void handleFailure(Throwable throwable) {
+// If there were any topics retries due to a quota exceeded 
exception, we propagate
+// the initial error back to the caller if the request timed 
out.
+if (options.shouldRetryOnQuotaViolation() && throwable 
instanceof TimeoutException) {
+completeQuotaExceededException(futures, 
quotaExceededExceptions,

Review comment:
   That's a very good suggestion, thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)

2020-09-29 Thread GitBox


dajac commented on pull request #9344:
URL: https://github.com/apache/kafka/pull/9344#issuecomment-700644563


   @rajinisivaram Thanks for your suggestion. I have updated the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2020-09-29 Thread GitBox


rajinisivaram commented on pull request #9345:
URL: https://github.com/apache/kafka/pull/9345#issuecomment-700664402


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-29 Thread GitBox


nizhikov commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r496670159



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib
+RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean
+RUN python3 -m pip install -U pip==20.2.2;
+RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34
+RUN pip3 install git+https://github.com/confluentinc/ducktape

Review comment:
   Hello! Any news on ducktape release?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)

2020-09-29 Thread GitBox


rajinisivaram commented on pull request #9344:
URL: https://github.com/apache/kafka/pull/9344#issuecomment-700735225


   Builds are good, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)

2020-09-29 Thread GitBox


rajinisivaram merged pull request #9344:
URL: https://github.com/apache/kafka/pull/9344


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-29 Thread GitBox


vvcephei commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-700766402


   Cherry-picked to 2.4 as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9584) Removing headers causes ConcurrentModificationException

2020-09-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9584:

Fix Version/s: 2.4.2

> Removing headers causes ConcurrentModificationException
> ---
>
> Key: KAFKA-9584
> URL: https://issues.apache.org/jira/browse/KAFKA-9584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Micah Ramos
>Assignee: Micah Ramos
>Priority: Minor
> Fix For: 2.4.2, 2.7.0, 2.5.2, 2.6.1
>
>
> The consumer record that is used during punctuate is static, this can cause 
> java.util.ConcurrentModificationException when modifying the headers. 
> Using a single instance of ConsumerRecord for all punctuates causes other 
> strange behavior:
>  # Headers are shared across partitions.
>  # A topology that adds a single header could append an infinite number of 
> headers (one per punctuate iteration), causing memory problems in the current 
> topology as well as down stream consumers since the headers are written with 
> the record when it is produced to a topic.  
>  
> I would expect that each invocation of punctuate would be initialized with a 
> new header object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nizhikov commented on pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-29 Thread GitBox


nizhikov commented on pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#issuecomment-700766947


   Hello, @guozhangwang , @mimaison 
   
   The only thing we need to have perfect merge is ducktape release that 
contains python3 fixes.
   Can we go with the current PR, which points to ducktape master and works 
just fine, and switch to the specific release once it becomes available?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-09-29 Thread GitBox


vvcephei commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r496796429



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final 
StoreQueryParameters storeQueryParamet
 public  List stores(final String storeName,
   final QueryableStoreType queryableStoreType) {
 final List allStores = new ArrayList<>();
-for (final StreamThreadStateStoreProvider provider : storeProviders) {
-final List stores = provider.stores(storeQueryParameters);
-allStores.addAll(stores);
+for (final StreamThreadStateStoreProvider storeProvider : 
storeProviders) {
+final List stores = storeProvider.stores(storeQueryParameters);
+if (!stores.isEmpty()) {
+allStores.addAll(stores);
+if (storeQueryParameters.partition() != null) {
+break;
+}
+}
 }
 if (allStores.isEmpty()) {
+if (storeQueryParameters.partition() != null) {
+throw new InvalidStateStoreException(
+String.format("The specified partition %d for store %s 
does not exist.",

Review comment:
   Hey @dima5rr , I think Guozhang's question was hidden because the 
conversation was already "resolved". Do you mind answering this concern?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9514) The protocol generator generated useless condition when a field is made nullable and flexible version is used

2020-09-29 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-9514.

Fix Version/s: 2.7.0
   Resolution: Fixed

> The protocol generator generated useless condition when a field is made 
> nullable and flexible version is used
> -
>
> Key: KAFKA-9514
> URL: https://issues.apache.org/jira/browse/KAFKA-9514
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>
> The protocol generator generates useless conditions when a field of type 
> string is made nullable after the request has been converted to using 
> optional fields.
> As an example, we have make the field `ProtocolName` nullable in the 
> `JoinGroupResponse`. The `JoinGroupResponse` supports optional fields since 
> version 6 and the field is nullable since version 7. Under these conditions, 
> the generator generates the following code:
> {code:java}
> if (protocolName == null) {
>  if (_version >= 7) {
>if (_version >= 6) {
>  _writable.writeUnsignedVarint(0);
>} else {
>  _writable.writeShort((short) -1);
>   }
>  } else {
>throw new NullPointerException();
>  }
> }
> {code}
> spotbugs raises an `UC_USELESS_CONDITION` because `_version >= 6` is always 
> true.  
> We could optimise the generator to handle this.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9546) Make FileStreamSourceTask extendable with generic streams

2020-09-29 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9546.
--
Resolution: Won't Fix

I'm going to close this as WONTFIX, per my previous comment.

> Make FileStreamSourceTask extendable with generic streams
> -
>
> Key: KAFKA-9546
> URL: https://issues.apache.org/jira/browse/KAFKA-9546
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Csaba Galyo
>Assignee: Csaba Galyo
>Priority: Major
>  Labels: connect-api, needs-kip
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> Use case: I want to read a ZIP compressed text file with a file connector and 
> send it to Kafka.
> Currently, we have FileStreamSourceConnector which reads a \n delimited text 
> file. This connector always returns a task of type FileStreamSourceTask.
> The FileStreamSourceTask reads from stdio or opens a file InputStream. The 
> issue with this approach is that the input needs to be a text file, otherwise 
> it won't work. 
> The code should be modified so that users could change the default 
> InputStream to eg. ZipInputStream, or any other format. The code is currently 
> written in such a way that it's not possible to extend it, we cannot use a 
> different input stream. 
> See example here where the code got copy-pasted just so it could read from a 
> ZstdInputStream (which reads ZSTD compressed files): 
> [https://github.com/gcsaba2/kafka-zstd/tree/master/src/main/java/org/apache/kafka/connect/file]
>  
> I suggest 2 changes:
>  # FileStreamSourceConnector should be extendable to return tasks of 
> different types. These types would be input by the user through the 
> configuration map
>  # FileStreamSourceTask should be modified so it could be extended and child 
> classes could define different input streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch closed pull request #8134: KAFKA-9546 Allow custom tasks through configuration

2020-09-29 Thread GitBox


rhauch closed pull request #8134:
URL: https://github.com/apache/kafka/pull/8134


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8134: KAFKA-9546 Allow custom tasks through configuration

2020-09-29 Thread GitBox


rhauch commented on pull request #8134:
URL: https://github.com/apache/kafka/pull/8134#issuecomment-700796780


   Thanks for the PR, @gcsaba2. However, I'm going to close this per my 
comments on https://issues.apache.org/jira/browse/KAFKA-9546. 
   
   TLDR; the `FileStreamSourceConnector` class is intentionally a simple 
example of a connector, and extending this as suggested makes that it harder to 
use and more error prone. Instead, just create your own connector with the 
desired functionality.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


kowshik edited a comment on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-700585584


   @junrao Thanks a lot for the review! I've addressed the comments in the 
recent commit: 06d8b47131f168db88e4f7d5bda3dd025ba9a2a2. I've provided a 
response to all of your comments. There are few I couldn't address, and 1-2 
comments I'll address in the near future.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edenhill commented on pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-29 Thread GitBox


edenhill commented on pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#issuecomment-700817177


   We'll want to use a released version of ducktape to get stable tests. I'm 
working on getting a ducktape release out.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10533) Add log flush semantics to simulation test

2020-09-29 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10533:
---

 Summary: Add log flush semantics to simulation test
 Key: KAFKA-10533
 URL: https://issues.apache.org/jira/browse/KAFKA-10533
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10533) Add log flush semantics to simulation test

2020-09-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10533:

Description: In order to do KAFKA-10526, it is useful to add support for 
flush semantics to `MockLog` and to use them in `RaftSimulationTest`. 

> Add log flush semantics to simulation test
> --
>
> Key: KAFKA-10533
> URL: https://issues.apache.org/jira/browse/KAFKA-10533
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> In order to do KAFKA-10526, it is useful to add support for flush semantics 
> to `MockLog` and to use them in `RaftSimulationTest`. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10534) Modify the originals parameter type of the AbstractConfig class to avoid redundant judgments in the code

2020-09-29 Thread linenwei (Jira)
linenwei created KAFKA-10534:


 Summary: Modify the originals parameter type of the AbstractConfig 
class to avoid redundant judgments in the code
 Key: KAFKA-10534
 URL: https://issues.apache.org/jira/browse/KAFKA-10534
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.6.0
Reporter: linenwei


{code:java}
 @SuppressWarnings("unchecked")
  public AbstractConfig(ConfigDef definition, Map originals,  Map configProviderProps, boolean doLog){ 
 /* check that all the keys are really strings */
  for (Map.Entry entry : originals.entrySet())
  if (!(entry.getKey() instanceof String))   
  throw new ConfigException(entry.getKey().toString(),   
entry.getValue(), "Key must be a string.");


{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10534) Modify the originals parameter type of the AbstractConfig class to avoid redundant judgments in the code

2020-09-29 Thread linenwei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

linenwei updated KAFKA-10534:
-
  Flags: Patch
Description: 
Source Code:
{code:java}
 @SuppressWarnings("unchecked")
  public AbstractConfig(ConfigDef definition, Map originals,  Map configProviderProps, boolean doLog){ 
 /* check that all the keys are really strings */
  for (Map.Entry entry : originals.entrySet())
  if (!(entry.getKey() instanceof String))   
  throw new ConfigException(entry.getKey().toString(),   
entry.getValue(), "Key must be a string.");


{code}
In source code, I find that originals map key {color:#FF}must be a 
string{color}. From my point of view,  why not use Map originals to 
replace Map originals? thus will not need to do a judgement for the key 
type. 


I'm not sure if I've thought about it, but if there are other reasons for doing 
this, I'd love to know why.

 

  was:
{code:java}
 @SuppressWarnings("unchecked")
  public AbstractConfig(ConfigDef definition, Map originals,  Map configProviderProps, boolean doLog){ 
 /* check that all the keys are really strings */
  for (Map.Entry entry : originals.entrySet())
  if (!(entry.getKey() instanceof String))   
  throw new ConfigException(entry.getKey().toString(),   
entry.getValue(), "Key must be a string.");


{code}


> Modify the originals parameter type of the AbstractConfig class to avoid 
> redundant judgments in the code
> 
>
> Key: KAFKA-10534
> URL: https://issues.apache.org/jira/browse/KAFKA-10534
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.6.0
>Reporter: linenwei
>Priority: Trivial
>
> Source Code:
> {code:java}
>  @SuppressWarnings("unchecked")
>   public AbstractConfig(ConfigDef definition, Map originals,  
> Map configProviderProps, boolean doLog){ 
>  /* check that all the keys are really strings */
>   for (Map.Entry entry : originals.entrySet())
>   if (!(entry.getKey() instanceof String))   
>   throw new ConfigException(entry.getKey().toString(),   
> entry.getValue(), "Key must be a string.");
> {code}
> In source code, I find that originals map key {color:#FF}must be a 
> string{color}. From my point of view,  why not use Map originals 
> to replace Map originals? thus will not need to do a judgement for the 
> key type. 
> I'm not sure if I've thought about it, but if there are other reasons for 
> doing this, I'd love to know why.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10534) Modify the AbstractConfig class, convert parameter `originals` type from Map to Map to avoid redundant judgments in the code

2020-09-29 Thread linenwei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

linenwei updated KAFKA-10534:
-
Summary: Modify the AbstractConfig  class,  convert parameter `originals` 
type from Map to Map  to avoid redundant judgments in the code 
 (was: Modify the originals parameter type of the AbstractConfig class to avoid 
redundant judgments in the code)

> Modify the AbstractConfig  class,  convert parameter `originals` type from 
> Map to Map  to avoid redundant judgments in the code
> 
>
> Key: KAFKA-10534
> URL: https://issues.apache.org/jira/browse/KAFKA-10534
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.6.0
>Reporter: linenwei
>Priority: Trivial
>
> Source Code:
> {code:java}
>  @SuppressWarnings("unchecked")
>   public AbstractConfig(ConfigDef definition, Map originals,  
> Map configProviderProps, boolean doLog){ 
>  /* check that all the keys are really strings */
>   for (Map.Entry entry : originals.entrySet())
>   if (!(entry.getKey() instanceof String))   
>   throw new ConfigException(entry.getKey().toString(),   
> entry.getValue(), "Key must be a string.");
> {code}
> In source code, I find that originals map key {color:#FF}must be a 
> string{color}. From my point of view,  why not use Map originals 
> to replace Map originals? thus will not need to do a judgement for the 
> key type. 
> I'm not sure if I've thought about it, but if there are other reasons for 
> doing this, I'd love to know why.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-29 Thread GitBox


chia7712 commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496904812



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -133,11 +133,14 @@ public void start() {
 List partitions = new ArrayList<>();
 
 // We expect that the topics will have been created either manually by 
the user or automatically by the herder
-List partitionInfos = null;
-long started = time.milliseconds();
-while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
+List partitionInfos = consumer.partitionsFor(topic);
+long started = time.nanoseconds();
+long maxSleepMs = 1_000;
+long sleepMs = 10;
+while (partitionInfos == null && time.nanoseconds() - started < 
CREATE_TOPIC_TIMEOUT_NS) {
+time.sleep(sleepMs);
+sleepMs = Math.min(2 * sleepMs, maxSleepMs);
 partitionInfos = consumer.partitionsFor(topic);

Review comment:
   Why not using ```partitionsFor(String topic, Duration timeout) ``` 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1944)
 to replace while loop?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-29 Thread GitBox


abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496901588



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {

Review comment:
   Do we want to have a different name from 
`org.apache.kafka.common.feature.FinalizedVersionRange`, such as 
`FinalizedVersionLevels`? Same case for `SupportedVersionRange`, personally I 
feel the same class name makes the navigation harder.

##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * ===
+ *
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, 
deprecated finalized
+ * feature versions are no longer advertised to the client, but they can still 
be used by existing
+ * connections. The way it works is that the feature versioning system (via 
the controller) will
+ * automatically persist the new minVersionLevel for the feature in ZK to 
propagate the deprecation
+ * of certain versions. After this happens, any external client that queries 
the Broker to learn the
+ * feature versions will at some point start to see the new value for the 
finalized minVersionLevel
+ * for the feature. The external clients are expected to stop using the 
deprecated versions at least
+ * by the time that they learn about it.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. Whenever the controller is elected or the features are 
finalized via the

[GitHub] [kafka] kobebryantlin0 opened a new pull request #9350: KAFKA-10534: change params type to avoid avoid redundant judgments.

2020-09-29 Thread GitBox


kobebryantlin0 opened a new pull request #9350:
URL: https://github.com/apache/kafka/pull/9350


   Modify the AbstractConfig class, convert params `originals` type from Map to Map to avoid redundant judgments in the code.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] leosilvadev opened a new pull request #9351: remove unecessary int-long widening

2020-09-29 Thread GitBox


leosilvadev opened a new pull request #9351:
URL: https://github.com/apache/kafka/pull/9351


   There is a JIRA ticket requesting this small change 
(https://issues.apache.org/jira/browse/KAFKA-10047)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10535:


 Summary: KIP-478: Implement StateStoreContext and Record
 Key: KAFKA-10535
 URL: https://issues.apache.org/jira/browse/KAFKA-10535
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10536) KIP-478: Implement KStream changes

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10536:


 Summary: KIP-478: Implement KStream changes
 Key: KAFKA-10536
 URL: https://issues.apache.org/jira/browse/KAFKA-10536
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10537) Convert KStreamImpl filters to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10537:


 Summary: Convert KStreamImpl filters to new PAPI
 Key: KAFKA-10537
 URL: https://issues.apache.org/jira/browse/KAFKA-10537
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10538) Convert KStreamImpl maps to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10538:


 Summary: Convert KStreamImpl maps to new PAPI
 Key: KAFKA-10538
 URL: https://issues.apache.org/jira/browse/KAFKA-10538
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10539) Convert KStreamImpl joins to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10539:


 Summary: Convert KStreamImpl joins to new PAPI
 Key: KAFKA-10539
 URL: https://issues.apache.org/jira/browse/KAFKA-10539
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ahuang98 commented on pull request #9340: Improving Fetch Session Caching for KAFKA-9401

2020-09-29 Thread GitBox


ahuang98 commented on pull request #9340:
URL: https://github.com/apache/kafka/pull/9340#issuecomment-700886220


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10539) Convert KStreamImpl joins to new PAPI

2020-09-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-10539:


Assignee: John Roesler

> Convert KStreamImpl joins to new PAPI
> -
>
> Key: KAFKA-10539
> URL: https://issues.apache.org/jira/browse/KAFKA-10539
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10540) Convert KStream aggregations to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10540:


 Summary: Convert KStream aggregations to new PAPI
 Key: KAFKA-10540
 URL: https://issues.apache.org/jira/browse/KAFKA-10540
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ahuang98 removed a comment on pull request #9340: Improving Fetch Session Caching for KAFKA-9401

2020-09-29 Thread GitBox


ahuang98 removed a comment on pull request #9340:
URL: https://github.com/apache/kafka/pull/9340#issuecomment-700886220


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10541) Convert KTable filters to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10541:


 Summary: Convert KTable filters to new PAPI
 Key: KAFKA-10541
 URL: https://issues.apache.org/jira/browse/KAFKA-10541
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10543) Convert KTable joins to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10543:


 Summary: Convert KTable joins to new PAPI
 Key: KAFKA-10543
 URL: https://issues.apache.org/jira/browse/KAFKA-10543
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10544) Convert KTable aggregations to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10544:


 Summary: Convert KTable aggregations to new PAPI
 Key: KAFKA-10544
 URL: https://issues.apache.org/jira/browse/KAFKA-10544
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10542) Convert KTable maps to new PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10542:


 Summary: Convert KTable maps to new PAPI
 Key: KAFKA-10542
 URL: https://issues.apache.org/jira/browse/KAFKA-10542
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10546) KIP-478: Deprecate old PAPI

2020-09-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10546:


 Summary: KIP-478: Deprecate old PAPI
 Key: KAFKA-10546
 URL: https://issues.apache.org/jira/browse/KAFKA-10546
 Project: Kafka
  Issue Type: Sub-task
Reporter: John Roesler
Assignee: John Roesler


Can't be done until after the DSL internals are migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10540) Convert KStream aggregations to new PAPI

2020-09-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-10540:


Assignee: John Roesler

> Convert KStream aggregations to new PAPI
> 
>
> Key: KAFKA-10540
> URL: https://issues.apache.org/jira/browse/KAFKA-10540
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10544) Convert KTable aggregations to new PAPI

2020-09-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-10544:


Assignee: John Roesler

> Convert KTable aggregations to new PAPI
> ---
>
> Key: KAFKA-10544
> URL: https://issues.apache.org/jira/browse/KAFKA-10544
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10543) Convert KTable joins to new PAPI

2020-09-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-10543:


Assignee: John Roesler

> Convert KTable joins to new PAPI
> 
>
> Key: KAFKA-10543
> URL: https://issues.apache.org/jira/browse/KAFKA-10543
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10545) Create Topic IDs and Propagate to Brokers

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10545:
--

 Summary: Create Topic IDs and Propagate to Brokers
 Key: KAFKA-10545
 URL: https://issues.apache.org/jira/browse/KAFKA-10545
 Project: Kafka
  Issue Type: Improvement
Reporter: Justine Olshan
Assignee: Justine Olshan


First step for KIP-516

The goals are:
 * Create and store topic IDs in a ZK Node and controller memory.
 * Propagate topic ID to brokers with updated LeaderAndIsrRequest
 * Store topic ID in memory on broker, persistent file in log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10542) Convert KTable maps to new PAPI

2020-09-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-10542:


Assignee: John Roesler

> Convert KTable maps to new PAPI
> ---
>
> Key: KAFKA-10542
> URL: https://issues.apache.org/jira/browse/KAFKA-10542
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-09-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Summary: KIP-478: Implement test-utils changes  (was: Convert test-utils 
(and StateStore) for KIP-478)

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers

2020-09-29 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-8872:
--
Description: 
Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 

Related Issues:

[https://issues.apache.org/jira/browse/KAFKA-10545|http://example.com]

  was:
Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 


> Improvements to controller "deleting" state /  topic Identifiers
> 
>
> Key: KAFKA-8872
> URL: https://issues.apache.org/jira/browse/KAFKA-8872
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Bradstreet
>Priority: Major
>
> Kafka currently uniquely identifies a topic by its name. This is generally 
> sufficient, but there are flaws in this scheme if a topic is deleted and 
> recreated with the same name. As a result, Kafka attempts to prevent these 
> classes of issues by ensuring a topic is deleted from all replicas before 
> completing a deletion. This solution is not perfect, as it is possible for 
> partitions to be reassigned from brokers while they are down, and there are 
> no guarantees that this state will ever be cleaned up and will not cause 
> issues in the future.
> As the controller must wait for all replicas to delete their local 
> partitions, deletes can also become blocked, preventing topics from being 
> created with the same name until the deletion is complete on all replicas. 
> This can mean that downtime for a single broker can effectively cause a 
> complete outage for everyone producing/consuming to that topic name, as the 
> topic cannot be recreated without manual intervention.
> Unique topic IDs could help address this issue by associating a unique ID 
> with each topic, ensuring a newly created topic with a previously used name 
> cannot be confused with a previous topic with that name.
>  
> Related Issues:
> [https://issues.apache.org/jira/browse/KAFKA-10545|http://example.com]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers

2020-09-29 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-8872:
--
Description: 
Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 

Related Issues:

[KAFKA-10545|http://example.com/]

  was:
Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 

Related Issues:

[https://issues.apache.org/jira/browse/KAFKA-10545|http://example.com]


> Improvements to controller "deleting" state /  topic Identifiers
> 
>
> Key: KAFKA-8872
> URL: https://issues.apache.org/jira/browse/KAFKA-8872
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Bradstreet
>Priority: Major
>
> Kafka currently uniquely identifies a topic by its name. This is generally 
> sufficient, but there are flaws in this scheme if a topic is deleted and 
> recreated with the same name. As a result, Kafka attempts to prevent these 
> classes of issues by ensuring a topic is deleted from all replicas before 
> completing a deletion. This solution is not perfect, as it is possible for 
> partitions to be reassigned from brokers while they are down, and there are 
> no guarantees that this state will ever be cleaned up and will not cause 
> issues in the future.
> As the controller must wait for all replicas to delete their local 
> partitions, deletes can also become blocked, preventing topics from being 
> created with the same name until the deletion is complete on all replicas. 
> This can mean that downtime for a single broker can effectively cause a 
> complete outage for everyone producing/consuming to that topic name, as the 
> topic cannot be recreated without manual intervention.
> Unique topic IDs could help address this issue by associating a unique ID 
> with each topic, ensuring a newly created topic with a previously used name 
> cannot be confused with a previous topic with that name.
>  
> Related Issues:
> [KAFKA-10545|http://example.com/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers

2020-09-29 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-8872:
--
Description: 
Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 

Related Issues:

KAFKA-10545

  was:
Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 

Related Issues:

[KAFKA-10545|http://example.com/]


> Improvements to controller "deleting" state /  topic Identifiers
> 
>
> Key: KAFKA-8872
> URL: https://issues.apache.org/jira/browse/KAFKA-8872
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Bradstreet
>Priority: Major
>
> Kafka currently uniquely identifies a topic by its name. This is generally 
> sufficient, but there are flaws in this scheme if a topic is deleted and 
> recreated with the same name. As a result, Kafka attempts to prevent these 
> classes of issues by ensuring a topic is deleted from all replicas before 
> completing a deletion. This solution is not perfect, as it is possible for 
> partitions to be reassigned from brokers while they are down, and there are 
> no guarantees that this state will ever be cleaned up and will not cause 
> issues in the future.
> As the controller must wait for all replicas to delete their local 
> partitions, deletes can also become blocked, preventing topics from being 
> created with the same name until the deletion is complete on all replicas. 
> This can mean that downtime for a single broker can effectively cause a 
> complete outage for everyone producing/consuming to that topic name, as the 
> topic cannot be recreated without manual intervention.
> Unique topic IDs could help address this issue by associating a unique ID 
> with each topic, ensuring a newly created topic with a previously used name 
> cannot be confused with a previous topic with that name.
>  
> Related Issues:
> KAFKA-10545



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10545) Create Topic IDs and Propagate to Brokers

2020-09-29 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10545:
---
Parent: KAFKA-8872
Issue Type: Sub-task  (was: Improvement)

> Create Topic IDs and Propagate to Brokers
> -
>
> Key: KAFKA-10545
> URL: https://issues.apache.org/jira/browse/KAFKA-10545
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> First step for KIP-516
> The goals are:
>  * Create and store topic IDs in a ZK Node and controller memory.
>  * Propagate topic ID to brokers with updated LeaderAndIsrRequest
>  * Store topic ID in memory on broker, persistent file in log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers

2020-09-29 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-8872:
--
Description: 
Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 

  was:
Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 

Related Issues:

KAFKA-10545


> Improvements to controller "deleting" state /  topic Identifiers
> 
>
> Key: KAFKA-8872
> URL: https://issues.apache.org/jira/browse/KAFKA-8872
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Bradstreet
>Priority: Major
>
> Kafka currently uniquely identifies a topic by its name. This is generally 
> sufficient, but there are flaws in this scheme if a topic is deleted and 
> recreated with the same name. As a result, Kafka attempts to prevent these 
> classes of issues by ensuring a topic is deleted from all replicas before 
> completing a deletion. This solution is not perfect, as it is possible for 
> partitions to be reassigned from brokers while they are down, and there are 
> no guarantees that this state will ever be cleaned up and will not cause 
> issues in the future.
> As the controller must wait for all replicas to delete their local 
> partitions, deletes can also become blocked, preventing topics from being 
> created with the same name until the deletion is complete on all replicas. 
> This can mean that downtime for a single broker can effectively cause a 
> complete outage for everyone producing/consuming to that topic name, as the 
> topic cannot be recreated without manual intervention.
> Unique topic IDs could help address this issue by associating a unique ID 
> with each topic, ensuring a newly created topic with a previously used name 
> cannot be confused with a previous topic with that name.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10547) Add topic IDs to MetadataResponse, UpdateMetadata, and Fetch

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10547:
--

 Summary: Add topic IDs to MetadataResponse, UpdateMetadata, and 
Fetch
 Key: KAFKA-10547
 URL: https://issues.apache.org/jira/browse/KAFKA-10547
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


Prevent reads from deleted topics

Will be able to use TopicDescription to identify the topic ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10548) Implement Type field and logic for LeaderAndIsrRequests

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10548:
--

 Summary: Implement Type field and logic for LeaderAndIsrRequests
 Key: KAFKA-10548
 URL: https://issues.apache.org/jira/browse/KAFKA-10548
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


This will allow for specialized deletion logic when receiving 
LeaderAndIsrRequests

Will also create and utilize delete.stale.topic.delay.ms configuration option



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10549) Add topic ID support to DeleteTopics,ListOffsets, OffsetForLeaders, StopReplica

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10549:
--

 Summary: Add topic ID support to DeleteTopics,ListOffsets, 
OffsetForLeaders, StopReplica
 Key: KAFKA-10549
 URL: https://issues.apache.org/jira/browse/KAFKA-10549
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


ListOffsets, OffsetForLeaders, and StopReplica protocols will replace topic 
name with topic ID and will be used to prevent reads from deleted topics

Delete topics will be changed to support topic ids and delete sooner.

This may be split into two or more issues if necessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10550) Update kafka-topics.sh to support Topic IDs

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10550:
--

 Summary: Update kafka-topics.sh to support Topic IDs
 Key: KAFKA-10550
 URL: https://issues.apache.org/jira/browse/KAFKA-10550
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


 Make changes to kafka-topics.sh --describe so a user can specify a topic name 
to describe with the --topic parameter, or alternatively the user can supply a 
topic ID with the --topic_id parameter



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-29 Thread GitBox


chia7712 commented on pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#issuecomment-700904592


   @hachikuji Thanks for reviews and update!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9352: KAFKA-10533; KafkaRaftClient should flush log after appends

2020-09-29 Thread GitBox


hachikuji opened a new pull request #9352:
URL: https://github.com/apache/kafka/pull/9352


   This patch adds missing flush logic to `KafkaRaftClient`. The initial 
flushing behavior is simplistic. We guarantee that the leader will not 
replicate above the last flushed offset and we guarantee that the follower will 
not fetch data above its own flush point. More sophisticated flush behavior is 
proposed in KAFKA-10526.
   
   We have also extended the simulation test so that it covers flush behavior. 
When a node is shutdown, all unflushed data is lost. We were able to confirm 
that the monotonic high watermark invariant fails without the added `flush` 
calls.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-29 Thread GitBox


nizhikov commented on pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#issuecomment-700910936


   @edenhill Thanks for the help! Appreciate it



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-29 Thread GitBox


hachikuji merged pull request #9284:
URL: https://github.com/apache/kafka/pull/9284


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10479) Throw exception if users try to update configs of existent listeners

2020-09-29 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10479.
-
Fix Version/s: 2.7.0
   Resolution: Fixed

> Throw exception if users try to update configs of existent listeners
> 
>
> Key: KAFKA-10479
> URL: https://issues.apache.org/jira/browse/KAFKA-10479
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.7.0
>
>
> {code}
> def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): 
> Map[String, AnyRef] = {
>   newConfig.originals.asScala.filter { case (key, _) =>
> key.startsWith(prefix) && !DynamicSecurityConfigs.contains(key)
>   }
> }
> {code}
> We don't actually compare new configs to origin configs so the suitable 
> exception is not thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10551) Support topic IDs in Produce request

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10551:
--

 Summary: Support topic IDs in Produce request
 Key: KAFKA-10551
 URL: https://issues.apache.org/jira/browse/KAFKA-10551
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


Replace the topic name with the topic ID so the request is smaller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-29 Thread GitBox


soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496983721



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -133,11 +133,14 @@ public void start() {
 List partitions = new ArrayList<>();
 
 // We expect that the topics will have been created either manually by 
the user or automatically by the herder
-List partitionInfos = null;
-long started = time.milliseconds();
-while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
+List partitionInfos = consumer.partitionsFor(topic);
+long started = time.nanoseconds();
+long maxSleepMs = 1_000;
+long sleepMs = 10;
+while (partitionInfos == null && time.nanoseconds() - started < 
CREATE_TOPIC_TIMEOUT_NS) {
+time.sleep(sleepMs);
+sleepMs = Math.min(2 * sleepMs, maxSleepMs);
 partitionInfos = consumer.partitionsFor(topic);

Review comment:
   I think the semantics needed here is different. The timeout in 
`partitionsFor` is the max amount of time the api can block waiting for 
response before it fails with `TimeoutException`. However, the api can return 
within timeout with empty results as newly created topics data has not been 
propagated yet. We then have to retry again until `partitionsFor` returns the 
partition data (upto a max time).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10552) Update directory structure to use topic IDs

2020-09-29 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10552:
--

 Summary: Update directory structure to use topic IDs
 Key: KAFKA-10552
 URL: https://issues.apache.org/jira/browse/KAFKA-10552
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


This change will probably coincide with a major release.

Topic names will be removed from the directory structure and replaced with 
topic IDs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-29 Thread GitBox


chia7712 commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r497017395



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -133,11 +133,14 @@ public void start() {
 List partitions = new ArrayList<>();
 
 // We expect that the topics will have been created either manually by 
the user or automatically by the herder
-List partitionInfos = null;
-long started = time.milliseconds();
-while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
+List partitionInfos = consumer.partitionsFor(topic);
+long started = time.nanoseconds();
+long maxSleepMs = 1_000;
+long sleepMs = 10;
+while (partitionInfos == null && time.nanoseconds() - started < 
CREATE_TOPIC_TIMEOUT_NS) {
+time.sleep(sleepMs);
+sleepMs = Math.min(2 * sleepMs, maxSleepMs);
 partitionInfos = consumer.partitionsFor(topic);

Review comment:
   It seems to me the behavior of ```timeout``` is not consistent in 
consumer methods. The ```timeout``` used by other methods (for example:  
```position```, ```offsetsForTimes```, ```beginningOffsets``` and 
```endOffsets```) is to await the result of specify partitions. It means 
consumer will send a request again if the timer is not expired and the specify 
partition has no metadata (i.e topics data has not been propagated yet). Maybe 
```partitionsFor``` should be fixed for consistent behavior.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10510) Reassigning partitions should not allow increasing RF of a partition unless configured with it

2020-09-29 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-10510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Rżysko reassigned KAFKA-10510:


Assignee: Piotr Rżysko

> Reassigning partitions should not allow increasing RF of a partition unless 
> configured with it
> --
>
> Key: KAFKA-10510
> URL: https://issues.apache.org/jira/browse/KAFKA-10510
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Piotr Rżysko
>Priority: Major
>
> Kafka should have some validations in place against increasing the RF of a 
> partition through a reassignment. Users could otherwise shoot themselves in 
> the foot by increasing the RF of a topic by reassigning its partitions to 
> extra replicas and then have new partition creations use a lesser (the 
> configured) replication factor.
> Our tools should ideally detect when RF is increasing inconsistently with the 
> rest of the topic's partitions (or the default replication factor)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-29 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495871017



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   I'm not sure if I follow it. ~This prefix "source_cluster" is not user 
specified. It is prefixed somewhere else by mm2 (I can get the lines if you 
want later on).~ The config specified 
[here](https://github.com/apache/kafka/tree/trunk/connect/mirror#producer--consumer--admin-config-used-by-mm2)
 will actually be honored. It does not imply changes on the MirrorMaker config 
side at least. Are you talking about the case of running MirrorMaker in a 
connect cluster rather than as a dedicated cluster? I didn't check that one 
TBH. Maybe @ryannedolan has more insight on this one?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-29 Thread GitBox


soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r497034435



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -133,11 +133,14 @@ public void start() {
 List partitions = new ArrayList<>();
 
 // We expect that the topics will have been created either manually by 
the user or automatically by the herder
-List partitionInfos = null;
-long started = time.milliseconds();
-while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
+List partitionInfos = consumer.partitionsFor(topic);
+long started = time.nanoseconds();
+long maxSleepMs = 1_000;
+long sleepMs = 10;
+while (partitionInfos == null && time.nanoseconds() - started < 
CREATE_TOPIC_TIMEOUT_NS) {
+time.sleep(sleepMs);
+sleepMs = Math.min(2 * sleepMs, maxSleepMs);
 partitionInfos = consumer.partitionsFor(topic);

Review comment:
   Not sure about this, no result for "partitionInfos" is a valid result. 
There is no point in automatic retrying. While for other apis that retry 
automatically, they do it if they get an invalid result back. If `null` was a 
valid result for them, they shouldn't retry either. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >