[GitHub] [kafka] kkonstantine commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-20 Thread GitBox


kkonstantine commented on pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#issuecomment-631879517


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-20 Thread GitBox


kkonstantine commented on a change in pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#discussion_r428443080



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for the creation of internal topics.
+ */
+@Category(IntegrationTest.class)
+public class InternalTopicsIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalTopicsIntegrationTest.class);
+
+private EmbeddedConnectCluster.Builder connectBuilder;
+private EmbeddedConnectCluster connect;
+Map workerProps = new HashMap<>();
+Properties brokerProps = new Properties();
+
+@Before
+public void setup() {
+// setup Kafka broker properties
+brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+// build a Connect cluster backed by Kafka and Zk
+connectBuilder = new EmbeddedConnectCluster.Builder()
+.name("connect-cluster")
+.numWorkers(1)
+.numBrokers(1)
+.brokerProps(brokerProps);
+}
+
+@After
+public void close() {
+// stop all Connect, Kafka and Zk threads.
+connect.stop();
+}
+
+@Test
+public void testCreateInternalTopicsWithDefaultSettings() throws 
InterruptedException {
+int numWorkers = 1;
+int numBrokers = 3;
+connect = new 
EmbeddedConnectCluster.Builder().name("connect-cluster-1")

Review comment:
   Makes sense. 

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -453,11 +453,17 @@ public void putSessionKey(SessionKey sessionKey) {
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
 
 Map adminProps = new HashMap<>(originals);
-NewTopic topicDescription = TopicAdmin.defineTopic(topic).
-compacted().
-partitions(1).
-
replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
-build();
+
+Map topicSettings = null;
+if (config instanceof DistributedConfig) {
+topicSettings = ((DistributedConfig) 
config).configStorageTopicSettings();
+}

Review comment:
   ```suggestion
   Map topicSettings = config instanceof 
DistributedConfig
   ? ((DistributedConfig) config).configStorageTopicSettings()
   : Collections.emptyMap();
   ```
   I know that `TopicAdmin#defineTopic` checks for `null`, but I think using 
`null` with collections is better to do when such optimization matters. Wdyt? 
   (btw you don't have to use the ternary operator, I just added it to make the 
suggestion clear). 
   
   Also, if you change here, please change in the other files too. 

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##
@@ -400,6 +424,33 @@ public KeyGenerator getInternalRequestKeyGenerator() {
 }
 }
 
+private Map topicSettings(String prefix) {
+Map result = originalsWithPrefix(prefix);
+if (CONFIG_STORAGE_PREFIX.equals(prefix) && 
result.containsKey(PARTITIONS_SUFFIX)) {
+log.warn("Ignoring '{}{}={}' setting, since config topic 
partitions is always 1", prefix, PARTITIONS_SUFFIX, result.get("partitions"));
+}
+Object removedPolicy = result.remove("cleanup.policy");

Review comment:
   It's one more 

[GitHub] [kafka] cmccabe opened a new pull request #8703: MINOR: add a getOrCreate function to KRPC collections

2020-05-20 Thread GitBox


cmccabe opened a new pull request #8703:
URL: https://github.com/apache/kafka/pull/8703


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] pgwhalen commented on pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-20 Thread GitBox


pgwhalen commented on pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#issuecomment-631873644


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-05-20 Thread GitBox


abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r428370254



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max 
of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {

Review comment:
   Do we want to get a unit test class for `BaseVersionRange`?

##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param  is the type of version range.
+ * @see SupportedVersionRange
+ * @see FinalizedVersionRange
+ */
+public class Features {
+private final Map features;
+
+/**
+ * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+ * static factory functions for instantiation (see below).
+ *
+ * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+ *   for the Features object.
+ */
+private Features(Map features) {
+this.features = features;
+}
+
+/**
+ * @param features   Map of feature name to VersionRange, as the backing 
data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"supported" features.
+ */
+public static Features 
supportedFeatures(Map features) {
+return new Features<>(features);
+}
+
+/**
+ * @param features   Map of feature name to FinalizedVersionRange, as the 
backing data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"finalized" features.
+ */
+public static Features 
finalizedFeatures(Map features) {
+return new Features<>(features);
+}
+
+// Visible for testing.
+public static Features emptyFinalizedFeatures() {
+return new Features<>(new HashMap<>());
+}
+
+public static Features emptySupportedFeatures() {
+return new Features<>(new HashMap<>());
+}
+
+public Map features() {
+return features;
+}
+
+public boolean empty() {
+return features.isEmpty();
+}
+
+/**
+  

[GitHub] [kafka] ijuma merged pull request #8472: KAFKA-9855 - return cached Structs for Schemas with no fields

2020-05-20 Thread GitBox


ijuma merged pull request #8472:
URL: https://github.com/apache/kafka/pull/8472


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8472: KAFKA-9855 - return cached Structs for Schemas with no fields

2020-05-20 Thread GitBox


ijuma commented on pull request #8472:
URL: https://github.com/apache/kafka/pull/8472#issuecomment-631870599


   I have a WIP PR that removes structs altogether, but this is a good 
improvement in the meantime. Merging to trunk. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9950) MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException

2020-05-20 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9950:
--
Fix Version/s: 2.5.1
   2.4.2
   2.6.0

> MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException
> -
>
> Key: KAFKA-9950
> URL: https://issues.apache.org/jira/browse/KAFKA-9950
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> The 
> [MirrorConnectorConfig::CONNECTOR_CONFIG_DEF|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L397]
>  object is reused across multiple MirrorMaker2 classes, which is fine the 
> most part since it's a constant. However, the actual {{ConfigDef}} object 
> itself is mutable, and is mutated when the {{MirrorTaskConfig}} class 
> [statically constructs its own 
> ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java#L62].
> This has two unintended effects:
>  # Since the two {{ConfigDef}} objects for the {{MirrorConnectorConfig}} and 
> {{MirrorTaskConfig}} classes are actually the same object, the additional 
> properties that the {{MirrorTaskConfig}} class defines for its {{ConfigDef}} 
> are also added to the {{MirrorConnectorConfig}} class's {{ConfigDef}}. The 
> impact of this isn't huge since both additional properties have default 
> values, but this does cause those properties to appear in the 
> {{/connectors/\{name}/config/validate}} endpoint once the 
> {{MirrorTaskConfig}} class is loaded for the first time.
>  # It's possible that, if a config for a MirrorMaker2 connector is submitted 
> at approximately the same time that the {{MirrorTaskConfig}} class is loaded, 
> a {{ConcurrentModificationException}} will be thrown by the 
> {{AbstractHerder}} class when it tries to [iterate over all of the keys of 
> the connector's 
> ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L357].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9950) MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException

2020-05-20 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-9950.
---
Resolution: Fixed

> MirrorMaker2 sharing of ConfigDef can lead to ConcurrentModificationException
> -
>
> Key: KAFKA-9950
> URL: https://issues.apache.org/jira/browse/KAFKA-9950
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> The 
> [MirrorConnectorConfig::CONNECTOR_CONFIG_DEF|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L397]
>  object is reused across multiple MirrorMaker2 classes, which is fine the 
> most part since it's a constant. However, the actual {{ConfigDef}} object 
> itself is mutable, and is mutated when the {{MirrorTaskConfig}} class 
> [statically constructs its own 
> ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java#L62].
> This has two unintended effects:
>  # Since the two {{ConfigDef}} objects for the {{MirrorConnectorConfig}} and 
> {{MirrorTaskConfig}} classes are actually the same object, the additional 
> properties that the {{MirrorTaskConfig}} class defines for its {{ConfigDef}} 
> are also added to the {{MirrorConnectorConfig}} class's {{ConfigDef}}. The 
> impact of this isn't huge since both additional properties have default 
> values, but this does cause those properties to appear in the 
> {{/connectors/\{name}/config/validate}} endpoint once the 
> {{MirrorTaskConfig}} class is loaded for the first time.
>  # It's possible that, if a config for a MirrorMaker2 connector is submitted 
> at approximately the same time that the {{MirrorTaskConfig}} class is loaded, 
> a {{ConcurrentModificationException}} will be thrown by the 
> {{AbstractHerder}} class when it tries to [iterate over all of the keys of 
> the connector's 
> ConfigDef|https://github.com/apache/kafka/blob/34824b7bff64ba387a04466d74ac6bbbd10bf37c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L357].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8511: KAFKA-9888: Copy connector configs before passing to REST extensions

2020-05-20 Thread GitBox


kkonstantine commented on pull request #8511:
URL: https://github.com/apache/kafka/pull/8511#issuecomment-631869695


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8511: KAFKA-9888: Copy connector configs before passing to REST extensions

2020-05-20 Thread GitBox


kkonstantine commented on a change in pull request #8511:
URL: https://github.com/apache/kafka/pull/8511#discussion_r428436342



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
##
@@ -86,7 +86,7 @@ public ConnectorHealth connectorHealth(String connName) {
 FutureCallback> connectorConfigCallback = new 
FutureCallback<>();
 herder.connectorConfig(connName, connectorConfigCallback);
 try {
-return connectorConfigCallback.get(herderRequestTimeoutMs, 
TimeUnit.MILLISECONDS);
+return new 
HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, 
TimeUnit.MILLISECONDS););

Review comment:
   My bad. My suggestion inserted a typo. At least I saw it before I start 
the build. 
   ```suggestion
   return new 
HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, 
TimeUnit.MILLISECONDS));
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8444: KAFKA-8869: Remove task configs for deleted connectors from config snapshot

2020-05-20 Thread GitBox


kkonstantine commented on pull request #8444:
URL: https://github.com/apache/kafka/pull/8444#issuecomment-631868755


   Merged to `trunk` and backported to `2.5`, `2.4` and `2.3`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #8608: KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties

2020-05-20 Thread GitBox


kkonstantine merged pull request #8608:
URL: https://github.com/apache/kafka/pull/8608


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8608: KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties

2020-05-20 Thread GitBox


kkonstantine commented on pull request #8608:
URL: https://github.com/apache/kafka/pull/8608#issuecomment-631867826


   jdk8: single failure on known flaky test: 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`
   jdk11: single failure on known flaky test: 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]`
   jdk14: success
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8118: KAFKA-9472: Remove deleted tasks from status store

2020-05-20 Thread GitBox


kkonstantine commented on pull request #8118:
URL: https://github.com/apache/kafka/pull/8118#issuecomment-631867151


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-8869) Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of task removals

2020-05-20 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-8869:
--
Fix Version/s: 2.5.1
   2.4.2
   2.3.2

> Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of 
> task removals
> ---
>
> Key: KAFKA-8869
> URL: https://issues.apache.org/jira/browse/KAFKA-8869
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Investigation of https://issues.apache.org/jira/browse/KAFKA-8676 revealed 
> another issue: 
> a map in {{KafkaConfigBackingStore}} keeps growing despite of connectors and 
> tasks getting removed eventually.
> This bug does not affect directly rebalancing protocols but it'd good to 
> resolve and use in a way similar to how {{connectorConfigs}} is used. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores

2020-05-20 Thread GitBox


ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r428433779



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -109,7 +94,7 @@ public void logChange(final String storeName,
   final long timestamp) {
 throwUnsupportedOperationExceptionIfStandby("logChange");
 // Sending null headers to changelog topics (KIP-244)
-collector.send(
+streamTask.recordCollector().send(

Review comment:
   I tried to reduce the number of unnecessary local variables that could 
potentially get out of sync





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8869) Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of task removals

2020-05-20 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-8869.
---
Resolution: Fixed

> Map taskConfigs in KafkaConfigBackingStore grows monotonically despite of 
> task removals
> ---
>
> Key: KAFKA-8869
> URL: https://issues.apache.org/jira/browse/KAFKA-8869
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Investigation of https://issues.apache.org/jira/browse/KAFKA-8676 revealed 
> another issue: 
> a map in {{KafkaConfigBackingStore}} keeps growing despite of connectors and 
> tasks getting removed eventually.
> This bug does not affect directly rebalancing protocols but it'd good to 
> resolve and use in a way similar to how {{connectorConfigs}} is used. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] radai-rosenblatt commented on pull request #8472: KAFKA-9855 - return cached Structs for Schemas with no fields

2020-05-20 Thread GitBox


radai-rosenblatt commented on pull request #8472:
URL: https://github.com/apache/kafka/pull/8472#issuecomment-631866599


   @ijuma - any more modifications required?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9472) Reducing number of tasks for connector causes deleted tasks to show as UNASSIGNED

2020-05-20 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9472:
--
Fix Version/s: 2.5.1
   2.4.2
   2.6.0
   2.3.2

> Reducing number of tasks for connector causes deleted tasks to show as 
> UNASSIGNED
> -
>
> Key: KAFKA-9472
> URL: https://issues.apache.org/jira/browse/KAFKA-9472
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 
> 2.4.0, 2.3.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> If a connector is successfully created with {{t1}} running tasks and then 
> reconfigured to use {{t1 - n}} tasks (where {{t1}} and {{n}} are both whole 
> numbers and {{n}} is strictly less than {{t1}}), the connector should then 
> list {{t1 - n}} total tasks in its status (which can be queried via the 
> {{/connectors/:name:/status}} endpoint or the {{/connectors}} endpoint with 
> the {{expand}} URL query parameter set to {{status}}).
> However, the connector will instead continue to list {{t1}} total tasks in 
> its status, with {{n}} of them being listed as {{UNASSIGNED}} and the 
> remaining {{t1 - n}} of them being listed as {{STARTED}}.
> This is because the only time a task status is removed from the status 
> backing store (as opposed to simply being updated to {{UNASSIGNED}}) is when 
> its connector is deleted. See relevant code snippets from the 
> [AbstractHerder|https://github.com/apache/kafka/blob/df13fc93d0aebfe0ecc40dd4af3c5fb19b35f710/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L187-L192]
>  and 
> [DistributedHerder|https://github.com/apache/kafka/blob/df13fc93d0aebfe0ecc40dd4af3c5fb19b35f710/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1511-L1520]
>  classes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] radai-rosenblatt commented on pull request #8193: be helpful when throwing ConcurrentModificationException out of consumers

2020-05-20 Thread GitBox


radai-rosenblatt commented on pull request #8193:
URL: https://github.com/apache/kafka/pull/8193#issuecomment-631866532


   @ijuma - any further modifications required?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine merged pull request #8444: KAFKA-8869: Remove task configs for deleted connectors from config snapshot

2020-05-20 Thread GitBox


kkonstantine merged pull request #8444:
URL: https://github.com/apache/kafka/pull/8444


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] pgwhalen commented on a change in pull request #6824: KAFKA-7523: Add ConnectedStoreProvider to Processor API

2020-05-20 Thread GitBox


pgwhalen commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r428422376



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##
@@ -953,7 +977,7 @@ void to(final TopicNameExtractor topicExtractor,
  * flatTransform()}.
  *
  * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a {@link Transformer}
- * @param stateStoreNames the names of the state stores used by the 
processor
+ * @param stateStoreNames the names of the state stores used by the 
transformer, passed only if {@link ConnectedStoreProvider#stores()} is null

Review comment:
   Good catch, I will change this with the revised `KStream` javadocs 
coming soon.  It was a relic of my original proposal; the code was written 
before we decided to change it and I missed removing this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner edited a comment on pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-20 Thread GitBox


apovzner edited a comment on pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#issuecomment-631853083


   JDK 14 build has an unrelated test failure: 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
   It looks like tests passed on other builds, but it failed to record the 
results.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-20 Thread GitBox


apovzner commented on pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#issuecomment-631853083


   JDK build has an unrelated test failure: 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
   It looks like tests passed on other builds, but it failed to record the 
results.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8444: KAFKA-8869: Remove task configs for deleted connectors from config snapshot

2020-05-20 Thread GitBox


kkonstantine commented on pull request #8444:
URL: https://github.com/apache/kafka/pull/8444#issuecomment-631852881


   jdk8: success
   jdk11: success
   jdk14: single failure on known flaky test: 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9409) Increase Immutability of ClusterConfigState

2020-05-20 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-9409.
---
Resolution: Fixed

> Increase Immutability of ClusterConfigState
> ---
>
> Key: KAFKA-9409
> URL: https://issues.apache.org/jira/browse/KAFKA-9409
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mollitor
>Priority: Minor
> Fix For: 2.6.0
>
>
> The class claims that it is immutable, but there are some mutable features of 
> this class.
>  
> Increase the immutability of it and add a little cleanup:
>  * Pre-initialize size of ArrayList
>  * Remove superfluous syntax
>  * Use ArrayList instead of LinkedList since the list is created once



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9409) Increase Immutability of ClusterConfigState

2020-05-20 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9409:
--
Fix Version/s: 2.6.0

> Increase Immutability of ClusterConfigState
> ---
>
> Key: KAFKA-9409
> URL: https://issues.apache.org/jira/browse/KAFKA-9409
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mollitor
>Priority: Minor
> Fix For: 2.6.0
>
>
> The class claims that it is immutable, but there are some mutable features of 
> this class.
>  
> Increase the immutability of it and add a little cleanup:
>  * Pre-initialize size of ArrayList
>  * Remove superfluous syntax
>  * Use ArrayList instead of LinkedList since the list is created once



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine merged pull request #7942: KAFKA-9409: Supplement immutability of ClusterConfigState class in Connect

2020-05-20 Thread GitBox


kkonstantine merged pull request #7942:
URL: https://github.com/apache/kafka/pull/7942


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-20 Thread GitBox


hachikuji commented on pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#issuecomment-631849729


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #7942: KAFKA-9409: Increase Immutability of ClusterConfigState

2020-05-20 Thread GitBox


kkonstantine commented on pull request #7942:
URL: https://github.com/apache/kafka/pull/7942#issuecomment-631849532


   jdk8: 1 flaky failure:  
`EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`
   jdk11: didn't start
   jdk14: success



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-05-20 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112732#comment-17112732
 ] 

victor commented on KAFKA-7500:
---

[~ryannedolan] I have two distinct kafka clusters located in different data 
centers - DC1 and DC2. How to organize kafka producer failover between two DCs? 
If primary kafka cluster (DC1) becomes unavailable, I want producer to switch 
to failover kafka cluster (DC2) and continue publishing to it? Producer also 
should be able to switch back to primary cluster, once it is available. Any 
good patterns, existing libs, approaches, code examples?

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id

2020-05-20 Thread GitBox


guozhangwang commented on pull request #8698:
URL: https://github.com/apache/kafka/pull/8698#issuecomment-631837772


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id

2020-05-20 Thread GitBox


guozhangwang commented on pull request #8698:
URL: https://github.com/apache/kafka/pull/8698#issuecomment-631837548


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy merged pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g

2020-05-20 Thread GitBox


omkreddy merged pull request #8700:
URL: https://github.com/apache/kafka/pull/8700


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g

2020-05-20 Thread GitBox


omkreddy commented on pull request #8700:
URL: https://github.com/apache/kafka/pull/8700#issuecomment-631829621


   Merging as test failures are not related,



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-20 Thread GitBox


hachikuji commented on a change in pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#discussion_r428389929



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -565,8 +566,8 @@ private void recordRebalanceFailure() {
 
 // Note that we override the request timeout using the rebalance 
timeout since that is the
 // maximum time that it may block on the coordinator. We add an extra 
5 seconds for small delays.
-
-int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, 
rebalanceConfig.rebalanceTimeoutMs + 5000);
+int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),

Review comment:
   The previous max check was wrong, but an alternative here is to use 
rebalanceTimeout + 5s in all cases regardless of the request timeout.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2020-05-20 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112690#comment-17112690
 ] 

victor commented on KAFKA-9981:
---

[~ChrisEgerton] mm2 has realized data backup. How does kafkaproduce realize 
automatic failover transparently?
How can the same kafkaproduce object automatically switch between two clusters?

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji opened a new pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-20 Thread GitBox


hachikuji opened a new pull request #8702:
URL: https://github.com/apache/kafka/pull/8702


   If the request timeout is larger than the rebalance timeout, we should use 
the former as the JoinGroup request timeout.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id

2020-05-20 Thread GitBox


guozhangwang commented on pull request #8698:
URL: https://github.com/apache/kafka/pull/8698#issuecomment-631810146


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-20 Thread GitBox


mjsax commented on pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#issuecomment-631809507


   Added the test. Will merge after Jenkins passed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-20 Thread GitBox


mjsax commented on pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#issuecomment-631804240


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-20 Thread GitBox


mjsax commented on pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#issuecomment-631804131


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8637: KAFKA-9976: Reuse repartition node in all cases for KGroupedStream and KGroupedTable aggregates

2020-05-20 Thread GitBox


mjsax commented on pull request #8637:
URL: https://github.com/apache/kafka/pull/8637#issuecomment-631803677


   Seems you agree to my last comment: 
https://github.com/apache/kafka/pull/8504#issuecomment-631757206
   
   I think it's best to close this PR and also the ticker (either as "not a 
problem" or "won't fix")?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-05-20 Thread GitBox


mjsax commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r428376734



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -77,6 +79,38 @@ public void 
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
 
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
 }
 
+
+@Test
+public void shouldReuseRepartitionTopicWithGeneratedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
+assertEquals(expectedTopologyWithGeneratedRepartitionTopic, 
builder.build(props).describe().toString());
+}
+
+@Test
+public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+final StreamJoined streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
+final Topology topology =  builder.build(props);
+System.out.println(topology.describe().toString());
+assertEquals(expectedTopologyWithUserNamedRepartitionTopics, 
topology.describe().toString());

Review comment:
   I guess both are acceptable solutions (ie, creating two repartition 
topics or throwing an exception). Your proposal is more user friendly but 
results in a more expensive deployment. The question might be, what do we try 
to optimize for?
   
   \cc @vvcephei @guozhangwang 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-05-20 Thread GitBox


mjsax commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r424060697



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -77,6 +79,38 @@ public void 
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
 
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
 }
 
+
+@Test
+public void shouldReuseRepartitionTopicWithGeneratedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
+assertEquals(expectedTopologyWithGeneratedRepartitionTopic, 
builder.build(props).describe().toString());
+}
+
+@Test
+public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+final StreamJoined streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
+final Topology topology =  builder.build(props);
+System.out.println(topology.describe().toString());
+assertEquals(expectedTopologyWithUserNamedRepartitionTopics, 
topology.describe().toString());

Review comment:
   Sorry for being undecided... Reading the code now, I am wondering if 
this behavior may become problematic with regard to topology upgrade. Assume, 
the first join is removed. Technically, the new topology is compatible, but we 
would now generate a new repartition topic name, and thus it's not compatible. 
This could be fixed by inserting a `repartition()` in the new code enforcing 
the old name -- however, this makes me wonder if we might want to throw a 
"naming conflict" (ie, cannot pick a name) exception based on the original 
topology for this case when both operators are named, and tell people to insert 
`repartition()` right away? For this case, if they later remove a join it's 
clear what is happening to them.
   
   Ie, we should still not create two repartition topics what would be "bad" 
(user could still enforce if by calling `repartition()` twice), but just throw 
with an informative error message? -- Curious what @vvcephei thinks?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-05-20 Thread GitBox


mjsax commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r424060697



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -77,6 +79,38 @@ public void 
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
 
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
 }
 
+
+@Test
+public void shouldReuseRepartitionTopicWithGeneratedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
+assertEquals(expectedTopologyWithGeneratedRepartitionTopic, 
builder.build(props).describe().toString());
+}
+
+@Test
+public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+final StreamJoined streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
+final Topology topology =  builder.build(props);
+System.out.println(topology.describe().toString());
+assertEquals(expectedTopologyWithUserNamedRepartitionTopics, 
topology.describe().toString());

Review comment:
   Sorry for being undecided... Reading the code now, I am wondering if 
this behavior may become problematic with regard to topology upgrade. Assume, 
the first join is removed. Technically, the new topology is compatible, but we 
would now generate a new repartition topic name, and thus it's not compatible. 
This could be fixed by inserting a `repartition()` in the new code enforcing 
the old name -- however, this make me wonder if we might want to throw a 
"naming conflict" (ie, cannot pick a name) exception based on the original 
topology for this case when both operators are named, and tell people to insert 
`repartition()` right away? For this case, if they later remove a join it's 
clear what is happening to them.
   
   Ie, we should still not create two repartition topics what would be "bad" 
(user could still enforce if by calling `repartition()` twice), but just throw 
with an informative error message? -- Curious what @vvcephei thinks?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-1056) Evenly Distribute Intervals in OffsetIndex

2020-05-20 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-1056.
--
Resolution: Fixed

> Evenly Distribute Intervals in OffsetIndex
> --
>
> Key: KAFKA-1056
> URL: https://issues.apache.org/jira/browse/KAFKA-1056
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> Today a new entry will be created in OffsetIndex for each produce request 
> regardless of the number of messages it contains. It is better to evenly 
> distribute the intervals between index entries for index search efficiency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2020-05-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112676#comment-17112676
 ] 

Matthias J. Sax commented on KAFKA-6520:


The consumer only stores/caches the last known coordinator metadata. There is 
not coordinator liveness check. The consumer only talks to the coordinator 
during a rebalance. However, if a network issue occurs during regular 
processing and the consumer cannot fetch data from the brokers, the cached 
coordinator metadata stays the same (ie, the coordinator is still known). Does 
this make sense?

In the end, I believe without a consumer change we cannot really resolve the 
issue though. Maybe we could introduce a new "disconnected timeout" to the 
consumer: internally, the consumer sends fetch requests on a regular basis. 
Those fetch request would timeout if there is a network issue. Currently, the 
consumer "swallows" those timeout exceptions and just keeps retrying. `poll()` 
would just return zero records, but never rethrow a TimeoutException. The new 
"disconnected timeout" could be used to set a limit how long fetch request 
should be retried: if all fetch requests timeout for a period longer than 
"disconnected timeout", poll() could throw a "DisconnectedException" in the 
next `poll()` call (to reuse TimeoutException could be miss leading?).

By default, the new "disconnected timeout" would be set to MAX_VALUE, and thus 
the default behavior would not change. Within KafkaStreams, we can set this new 
timeout to a smaller value and catch the exception (and change the Kafka 
Streams state to DISCONNECT). On a consecutive `poll()` that does not throw, we 
set the state back to RUNNING.

Thoughts?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8701: MINOR: Add reason to log message when incrementing the log start offset

2020-05-20 Thread GitBox


hachikuji commented on pull request #8701:
URL: https://github.com/apache/kafka/pull/8701#issuecomment-631789824


   Need to fix some of the uses in test cases. Debating whether I should create 
an enum for the reason and get rid of the overload...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g

2020-05-20 Thread GitBox


omkreddy commented on pull request #8700:
URL: https://github.com/apache/kafka/pull/8700#issuecomment-631785818


   retest this please
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #8701: MINOR: Add reason to log message when incrementing the log start offset

2020-05-20 Thread GitBox


hachikuji opened a new pull request #8701:
URL: https://github.com/apache/kafka/pull/8701


   Sometimes logging leaves us guessing at the cause of an increment to the log 
start offset. Since this results in deletion of user data, I think the logging 
should be clear about the reason.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428352951



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -270,9 +272,15 @@ Duration adminTimeout() {
 List metricsReporters() {
 List reporters = getConfiguredInstances(
 CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
MetricsReporter.class);
-JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+JmxReporter jmxReporter = new JmxReporter();
 jmxReporter.configure(this.originals());
 reporters.add(jmxReporter);
+MetricsContext metricsContext = new 
KafkaMetricsContext("kafka.connect.mirror");

Review comment:
   the KIP mentions that we are deprecating the jmx prefix directly on the 
JmxReporter, and instead are passing it via the metrics context as the 
`_namespace` parameter. This doesn't change the prefix or how they are exposed 
in jmx. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428351889



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -270,9 +272,15 @@ Duration adminTimeout() {
 List metricsReporters() {
 List reporters = getConfiguredInstances(
 CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
MetricsReporter.class);
-JmxReporter jmxReporter = new JmxReporter("kafka.connect.mirror");
+JmxReporter jmxReporter = new JmxReporter();
 jmxReporter.configure(this.originals());
 reporters.add(jmxReporter);
+MetricsContext metricsContext = new 
KafkaMetricsContext("kafka.connect.mirror");

Review comment:
   @rhauch we mentioned connect more generally in the KIP, but I can 
clarify to make it explicit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-20 Thread GitBox


C0urante commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r428333743



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null 
key).
+ * @param  The type of connect record.
+ */
+public class RecordIsTombstone> implements 
Predicate {
+@Override
+public ConfigDef config() {
+return new ConfigDef();

Review comment:
   Probably won't impact performance too much but we could technically use 
a single `ConfigDef` instance for the entire class instead of creating a new 
one every time this method is called.

##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null 
key).
+ * @param  The type of connect record.
+ */
+public class RecordIsTombstone> implements 
Predicate {
+@Override
+public ConfigDef config() {
+return new ConfigDef();
+}
+
+@Override
+public boolean test(R record) {
+return record.key() == null;

Review comment:
   I think we want to check the value instead of the key here?
   ```suggestion
   return record.value() == null;
   ```

##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null 
key).

Review comment:
   I think a tombstone is defined as a record with a null value, not a null 
key:
   ```suggestion
* A predicate which is true for records which are tombstones (i.e. have 
null values).
   ```

##
File path: 

[GitHub] [kafka] rhauch commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-20 Thread GitBox


rhauch commented on pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#issuecomment-631762530


   Thanks for the review, @kkonstantine. I think I've incorporated all of your 
suggestions.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-20 Thread GitBox


rhauch commented on pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#issuecomment-631762616


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-05-20 Thread GitBox


bbejeck commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r428343055



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##
@@ -77,6 +79,38 @@ public void 
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
 
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
 }
 
+
+@Test
+public void shouldReuseRepartitionTopicWithGeneratedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
+assertEquals(expectedTopologyWithGeneratedRepartitionTopic, 
builder.build(props).describe().toString());
+}
+
+@Test
+public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+final StreamsBuilder builder = new StreamsBuilder();
+final Properties props = new Properties();
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+final KStream stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+final KStream newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+final StreamJoined streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
+newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
+final Topology topology =  builder.build(props);
+System.out.println(topology.describe().toString());
+assertEquals(expectedTopologyWithUserNamedRepartitionTopics, 
topology.describe().toString());

Review comment:
   >This could be fixed by inserting a repartition() i the new code 
enforcing the old name -- however, this make me wonder if we might want to 
throw a "naming conflict" (ie, cannot pick a name) exception based on the 
original topology for this case when both operators are named, and tell people 
to insert repartition() right away? For this case, if they later remove a join 
it's clear what is happening to them.
   
   I see your point, but I think that is a bad user experience and IMHO leaks 
too much detail about an operation we want to handle automatically.
   
   I'm leaning towards the simpler case of what we had before.  With generated 
names re-use the reputation node, but if the user creates a new join with 
explicit names, just go ahead and create two repartition topics.
   
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-05-20 Thread GitBox


bbejeck commented on pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#issuecomment-631757206


   > @bbejeck Are the any backward compatibility concerns for KAFKA-9976 ?
   
   Good point, maybe we should just leave this one alone for now.  If you agree 
I'll close that PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id

2020-05-20 Thread GitBox


guozhangwang commented on pull request #8698:
URL: https://github.com/apache/kafka/pull/8698#issuecomment-631755813


   There are known flaky tests that may fail, I'm validating that there's no 
consistent test failures. @xinzhuxiansheng 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

2020-05-20 Thread GitBox


guozhangwang merged pull request #8661:
URL: https://github.com/apache/kafka/pull/8661


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9878) Block AddPartitionsToTxn call until the txn markers are committed

2020-05-20 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9878:
---
Description: 
Currently the EndTxn call from Producer will immediately return as the control 
record is written to the txn coordinator log. The ongoing transaction will be 
going to a pending state to wait for all txn markers to be propagated. In the 
meantime, producer client will start another new transaction but being rejected 
constantly until the pending state gets resolved, which is unnecessary round 
trips and more burden to the broker to handle repetitive requests.

To avoid this situation, we should make the Producer client wait for txn marker 
completion instead. This will incur better performance overall, as no more 
back-off shall be triggered for a subsequent transaction to begin.

On the other hand, we could also batch complete the AddPartitionsToTxn results 
if we buffered more than one request in the queue.

The third change is on the client side, which is to maintain the futures of the 
AddPartitionsToTxn calls to make more inflight changes as necessary.

  was:
Currently the EndTxn call from Producer will immediately return as the control 
record is written to the txn coordinator log. The ongoing transaction will be 
going to a pending state to wait for all txn markers to be propagated. In the 
meantime, producer client will start another new transaction but being rejected 
constantly until the pending state gets resolved, which is unnecessary round 
trips and more burden to the broker to handle repetitive requests.

To avoid this situation, we should make the Producer client wait for txn marker 
completion instead. This will incur better performance overall, as no more 
back-off shall be triggered for a subsequent transaction to begin.


> Block AddPartitionsToTxn call until the txn markers are committed
> -
>
> Key: KAFKA-9878
> URL: https://issues.apache.org/jira/browse/KAFKA-9878
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>
> Currently the EndTxn call from Producer will immediately return as the 
> control record is written to the txn coordinator log. The ongoing transaction 
> will be going to a pending state to wait for all txn markers to be 
> propagated. In the meantime, producer client will start another new 
> transaction but being rejected constantly until the pending state gets 
> resolved, which is unnecessary round trips and more burden to the broker to 
> handle repetitive requests.
> To avoid this situation, we should make the Producer client wait for txn 
> marker completion instead. This will incur better performance overall, as no 
> more back-off shall be triggered for a subsequent transaction to begin.
> On the other hand, we could also batch complete the AddPartitionsToTxn 
> results if we buffered more than one request in the queue.
> The third change is on the client side, which is to maintain the futures of 
> the AddPartitionsToTxn calls to make more inflight changes as necessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-20 Thread GitBox


rhauch commented on a change in pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#discussion_r428333268



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for the creation of internal topics.
+ */
+@Category(IntegrationTest.class)
+public class InternalTopicsIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalTopicsIntegrationTest.class);
+
+private EmbeddedConnectCluster.Builder connectBuilder;
+private EmbeddedConnectCluster connect;
+Map workerProps = new HashMap<>();
+Properties brokerProps = new Properties();
+
+@Before
+public void setup() {
+// setup Kafka broker properties
+brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+// build a Connect cluster backed by Kafka and Zk
+connectBuilder = new EmbeddedConnectCluster.Builder()
+.name("connect-cluster")
+.numWorkers(1)
+.numBrokers(1)
+.brokerProps(brokerProps);
+}
+
+@After
+public void close() {
+// stop all Connect, Kafka and Zk threads.
+connect.stop();
+}
+
+@Test
+public void testCreateInternalTopicsWithDefaultSettings() throws 
InterruptedException {
+int numWorkers = 1;
+int numBrokers = 3;
+connect = new 
EmbeddedConnectCluster.Builder().name("connect-cluster-1")

Review comment:
   I was running into other issues, and I never changed it. The names are 
not important, so I'll change them back to the same cluster name.
   
   I actually do prefer how each test fully sets up the cluster builder, in 
part because some of the information is changed and some isn't. I found it very 
error prone while I was working on this to reuse a builder and have each test 
only change *some* of the attributes, or to remember to set the worker props 
after changing the `workerProps` map.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-20 Thread GitBox


rhauch commented on a change in pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#discussion_r428334196



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for the creation of internal topics.
+ */
+@Category(IntegrationTest.class)
+public class InternalTopicsIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalTopicsIntegrationTest.class);
+
+private EmbeddedConnectCluster.Builder connectBuilder;
+private EmbeddedConnectCluster connect;
+Map workerProps = new HashMap<>();
+Properties brokerProps = new Properties();
+
+@Before
+public void setup() {
+// setup Kafka broker properties
+brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+// build a Connect cluster backed by Kafka and Zk
+connectBuilder = new EmbeddedConnectCluster.Builder()
+.name("connect-cluster")
+.numWorkers(1)
+.numBrokers(1)
+.brokerProps(brokerProps);
+}
+
+@After
+public void close() {
+// stop all Connect, Kafka and Zk threads.
+connect.stop();
+}
+
+@Test
+public void testCreateInternalTopicsWithDefaultSettings() throws 
InterruptedException {
+int numWorkers = 1;
+int numBrokers = 3;
+connect = new 
EmbeddedConnectCluster.Builder().name("connect-cluster-1")
+  .workerProps(workerProps)
+  .numWorkers(numWorkers)
+  .numBrokers(numBrokers)
+  .brokerProps(brokerProps)
+  .build();
+
+// Start the Connect cluster
+connect.start();
+connect.assertions().assertExactlyNumBrokersAreUp(numBrokers, "Brokers 
did not start in time.");
+connect.assertions().assertExactlyNumWorkersAreUp(numWorkers, "Worker 
did not start in time.");
+log.info("Completed startup of {} Kafka brokers and {} Connect 
workers", numBrokers, numWorkers);
+
+// Check the topics
+log.info("Verifying the internal topics for Connect");
+connect.assertions().assertTopicsExist(configTopic(), offsetTopic(), 
statusTopic());
+assertInternalTopicSettings();
+
+// Remove the Connect worker
+log.info("Stopping the Connect worker");
+connect.removeWorker();
+
+// Sleep for a bit
+Thread.sleep(3000);

Review comment:
   Good catch. I had put this in while debugging an issue, but it is not 
needed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-20 Thread GitBox


rhauch commented on a change in pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#discussion_r428333268



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for the creation of internal topics.
+ */
+@Category(IntegrationTest.class)
+public class InternalTopicsIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalTopicsIntegrationTest.class);
+
+private EmbeddedConnectCluster.Builder connectBuilder;
+private EmbeddedConnectCluster connect;
+Map workerProps = new HashMap<>();
+Properties brokerProps = new Properties();
+
+@Before
+public void setup() {
+// setup Kafka broker properties
+brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+// build a Connect cluster backed by Kafka and Zk
+connectBuilder = new EmbeddedConnectCluster.Builder()
+.name("connect-cluster")
+.numWorkers(1)
+.numBrokers(1)
+.brokerProps(brokerProps);
+}
+
+@After
+public void close() {
+// stop all Connect, Kafka and Zk threads.
+connect.stop();
+}
+
+@Test
+public void testCreateInternalTopicsWithDefaultSettings() throws 
InterruptedException {
+int numWorkers = 1;
+int numBrokers = 3;
+connect = new 
EmbeddedConnectCluster.Builder().name("connect-cluster-1")

Review comment:
   I was running into other issues, and I never changed it. I actually 
prefer how each test fully sets up the cluster builder, but the names should 
not be important.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8270: KAFKA-9216: Enforce connect internal topic configuration at startup

2020-05-20 Thread GitBox


rhauch commented on pull request #8270:
URL: https://github.com/apache/kafka/pull/8270#issuecomment-631747698


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8698: KAFKA-10022:console-producer supports the setting of client.id

2020-05-20 Thread GitBox


guozhangwang commented on pull request #8698:
URL: https://github.com/apache/kafka/pull/8698#issuecomment-631738575


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

2020-05-20 Thread GitBox


guozhangwang commented on pull request #8661:
URL: https://github.com/apache/kafka/pull/8661#issuecomment-631737855


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

2020-05-20 Thread GitBox


guozhangwang commented on pull request #8661:
URL: https://github.com/apache/kafka/pull/8661#issuecomment-631738264


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

2020-05-20 Thread GitBox


guozhangwang commented on pull request #8661:
URL: https://github.com/apache/kafka/pull/8661#issuecomment-631737464


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on pull request #8650: MINOR: Added unit tests for ConnectionQuotas

2020-05-20 Thread GitBox


apovzner commented on pull request #8650:
URL: https://github.com/apache/kafka/pull/8650#issuecomment-631702750


   One of the build failures failed due to one of the unit tests added in this 
PR. It was bug in the test, that was waiting on the wrong future, which had a 
name similar to another one. I fixed both waiting on the right future and 
changed variable name so that it is easier to spot.
   @mjsax Could you please rerun the tests?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-20 Thread GitBox


tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-631683003


   @C0urante you might also want to take a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8357: KAFKA-9767: Add logging to basic auth rest extension

2020-05-20 Thread GitBox


C0urante commented on pull request #8357:
URL: https://github.com/apache/kafka/pull/8357#issuecomment-631682508


   Thanks @rhauch; I've added the requested unit test. This is ready for 
another round when you have time



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8357: KAFKA-9767: Add logging to basic auth rest extension

2020-05-20 Thread GitBox


C0urante commented on a change in pull request #8357:
URL: https://github.com/apache/kafka/pull/8357#discussion_r428260448



##
File path: 
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
##
@@ -67,36 +87,60 @@ public void filter(ContainerRequestContext requestContext) 
throws IOException {
 private String password;
 
 public BasicAuthCallBackHandler(String credentials) {
-if (credentials != null) {
-int space = credentials.indexOf(SPACE);
-if (space > 0) {
-String method = credentials.substring(0, space);
-if (BASIC.equalsIgnoreCase(method)) {
-credentials = credentials.substring(space + 1);
-credentials = new 
String(Base64.getDecoder().decode(credentials),
- StandardCharsets.UTF_8);
-int i = credentials.indexOf(COLON);
-if (i > 0) {
-username = credentials.substring(0, i);
-password = credentials.substring(i + 1);
-}
-}
-}
+if (credentials == null) {
+log.trace("No credentials were provided with the request");
+return;
 }
+
+int space = credentials.indexOf(SPACE);
+if (space <= 0) {
+log.trace("Request credentials were malformed; no space 
present in value for authorization header");
+return;
+}
+
+String method = credentials.substring(0, space);
+if (!BASIC.equalsIgnoreCase(method)) {
+log.trace("Request credentials used {} authentication, but 
only {} supported; ignoring", method, BASIC);
+return;
+}
+
+credentials = credentials.substring(space + 1);
+credentials = new String(Base64.getDecoder().decode(credentials),
+ StandardCharsets.UTF_8);
+int i = credentials.indexOf(COLON);
+if (i <= 0) {
+log.trace("Request credentials were malformed; no colon 
present between username and password");
+return;
+}
+
+username = credentials.substring(0, i);
+password = credentials.substring(i + 1);
 }
 
 @Override
 public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
+List unsupportedCallbacks = new ArrayList<>();

Review comment:
    happy to optimize if you'd like, but it sounds like we can leave it 
as-is for now at least.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428255598



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 plugins.compareAndSwapWithDelegatingLoader();
 DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
 String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
 offsetBackingStore.configure(distributedConfig);

Review comment:
   Get kafkaClusterId from ConnectUtils.lookupKafkaClusterId in 
configure(...) of KafkaOffsetBackingStore





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-05-20 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112535#comment-17112535
 ] 

Sophie Blee-Goldman commented on KAFKA-9987:


[~hai_lin] I put this on pause to try and get another KIP into 2.6, but the 
non-testing code is all ready. I just haven't called for review on the PR yet 
since I'm still finishing up the unit tests/benchmarks.

So, yeah, the patch is ready if you want to do some local testing. I can't 
promise that there are no bugs, but it's a pretty straightforward algorithm and 
I've run some initial benchmarks with it working.

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-20 Thread GitBox


tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-631651186


   Still need to review test coverage, but @kkonstantine, @mimaison, @bbejeck 
you might want to give it an initial pass.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-20 Thread GitBox


tombentley opened a new pull request #8699:
URL: https://github.com/apache/kafka/pull/8699


   * Add Predicate interface
   * Add Filter SMT
   * Add the predicate implementations defined in the KIP.
   * Create abstraction in ConnectorConfig for configuring Transformations and 
Connectors with the "alias prefix" mechanism
   * Add tests and fix existing tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy opened a new pull request #8700: MINOR: Increase gradle daemon’s heap size to 2g

2020-05-20 Thread GitBox


omkreddy opened a new pull request #8700:
URL: https://github.com/apache/kafka/pull/8700


   We have seen out of memory error in builds. This PR is to increase the 
gradle heap memory to 2g.

   ```
   [Error] : Error while emitting kafka/log/LogTest
   [2020-05-20T11:33:15.133Z] GC overhead limit exceeded
   [2020-05-20T11:33:15.133Z] one error found 
   ```
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering

2020-05-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112511#comment-17112511
 ] 

Guozhang Wang commented on KAFKA-8159:
--

Hey John,

What I was asking is actually beyond signed numerical types -- for which I 
agree offering new serdes as opt-in should be sufficient -- but for any types 
like you described in the example above. Today our javadoc for a range query 
looks like this:

{code}
/**
 * Get an iterator over a given range of keys. This iterator must be closed 
after use.
 * The returned iterator must be safe from {@link 
java.util.ConcurrentModificationException}s
 * and must not return null values. No ordering guarantees are provided.
 * @param from The first key that could be in the range
 * @param to The last key that could be in the range
 * @return The iterator for this range.
 * @throws NullPointerException If null is used for from or to.
 * @throws InvalidStateStoreException if the store is not initialized
 */
KeyValueIterator range(K from, K to);
{code}

For most users the {{from}} < {{to}} relationship is implicitly define as 
`from.compareTo(to) <= 0`, however what they mostly also assume, but actually 
not guaranteed is that `serialize(from).compareTo(serialize(to)) <= 0`. And we 
should make it clear in the javadoc that the "first / last" key in the range is 
actually defined based on their serialized bytes, not by their objects, and it 
is user's responsibility to either make sure the serializers can correctly 
transfer the object ordering to bytes ordering, or have parameters passed in to 
{{from / to}} to obey the bytes ordering already.

> Built-in serdes for signed numbers do not obey lexicographical ordering
> ---
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Currently we assume consistent ordering between serialized and deserialized 
> keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes 
> will also obey bytesA < bytesB < bytesC. This is not true in general of the 
> built-in serdes for signed numerical types (eg Integer, Long). Specifically, 
> it is broken by the negative number representations which are 
> lexicographically greater than (all) positive number representations. 
>  
> One consequence of this is that an interactive query of a key range with a 
> negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) 
> will result in "unexpected behavior" depending on the specific store type.
>  
> For RocksDB stores with caching disabled, an empty iterator will be returned 
> regardless of whether any records do exist in that range. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception and crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-05-20 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112508#comment-17112508
 ] 

Hai Lin edited comment on KAFKA-4084 at 5/20/20, 6:18 PM:
--

Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

I am not sure if KIP-491 is necessarily the best approach to address this 
particular issue (in general, one probably shouldn't have any broker overloaded 
at any time). However, if there are other convincing use cases, we could 
consider it.

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 


was (Author: hai_lin):
Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

{*quote*}I am not sure if KIP-491 is necessarily the best approach to address 
this particular issue (in general, one probably shouldn't have any broker 
overloaded at any time). However, if there are other convincing use cases, we 
could consider it. {*quote*}

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and 

[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-05-20 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112508#comment-17112508
 ] 

Hai Lin edited comment on KAFKA-4084 at 5/20/20, 6:18 PM:
--

Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

{*quote*}I am not sure if KIP-491 is necessarily the best approach to address 
this particular issue (in general, one probably shouldn't have any broker 
overloaded at any time). However, if there are other convincing use cases, we 
could consider it. {*quote*}

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 


was (Author: hai_lin):
Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

{*quote*}

I am not sure if KIP-491 is necessarily the best approach to address this 
particular issue (in general, one probably shouldn't have any broker overloaded 
at any time). However, if there are other convincing use cases, we could 
consider it. 

{*quote*}

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> 

[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-05-20 Thread Hai Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112508#comment-17112508
 ] 

Hai Lin commented on KAFKA-4084:


Thanks [~sql_consulting] point me to this ticket. [~junrao] Just want to hear a 
bit more about why KIP-491 is not in consideration based on your comment above.

{*quote*}

I am not sure if KIP-491 is necessarily the best approach to address this 
particular issue (in general, one probably shouldn't have any broker overloaded 
at any time). However, if there are other convincing use cases, we could 
consider it. 

{*quote*}

I feel it's a very useful feature for a lot of operation cases:

 

1. For high replica rate when broker boot up:

To me uneven size of partition on production is very command, with throttle 
some big partitions will take much longer to get fully replicated. Sometimes we 
just want a fully replica broker(like in 10 minutes without replica rather than 
hours). A long time with under replica broker in the system add more complicity 
for operation. For example, we need to be careful there is no other broker is 
offline during the replicating process.

 

2 Other situation like outlier broker

This happen pretty often if the cluster is big, most of the time it's not 
easy(at least time consuming) to replace broker even with EBS. We would like to 
disable a broker as leader but not take it offline. So the on-call have time to 
investigate the problem without terminate it right away. With KIP-491 we can 
add a lot of automation to the system that handle some network partition for a 
single broker without actually replace it.

 

3 Potential

If we can manipulate the view of leader in a cluster, we can do a bit more like 
introduce different leader for producer and consumer(consumer now can consumer 
from replica but I think there is still way we can control it). Then we can add 
priority to the client level and isolate client to talk only some of the 
brokers. 

 

This is more for KIP-491, we can surely move it back to the original ticket if 
we feel there is more discussion for this.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #8511: KAFKA-9888: Copy connector configs before passing to REST extensions

2020-05-20 Thread GitBox


C0urante commented on pull request #8511:
URL: https://github.com/apache/kafka/pull/8511#issuecomment-631633951


   Thanks @kkonstantine, I've addressed both of your comments. Ready for 
another round when you have time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428205238



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -92,6 +93,7 @@
 private final ExecutorService executor;
 private final Time time;
 private final String workerId;
+private final String clusterId;

Review comment:
   I'm still keeping clusterId as a class variable in Worker, since we are 
using clusterId in other method of this class. We are getting clusterId from 
ConnectUtils.lookupKafkaClusterId inside Worker constructor and keep it as a 
class variable value.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428203784



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -92,6 +93,7 @@
 private final ExecutorService executor;
 private final Time time;
 private final String workerId;
+private final String clusterId;

Review comment:
   Removed clusterId from Worker constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428203600



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
##
@@ -93,7 +93,7 @@ public static void main(String[] args) {
 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
 config, ConnectorClientConfigOverridePolicy.class);
 Worker worker = new Worker(workerId, time, plugins, config, new 
FileOffsetBackingStore(),
-   connectorClientConfigOverridePolicy);
+   connectorClientConfigOverridePolicy, 
kafkaClusterId);

Review comment:
   Removed clusterId from Worker constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428203369



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##
@@ -101,24 +101,25 @@ public Connect startConnect(Map 
workerProps) {
 URI advertisedUrl = rest.advertisedUrl();
 String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
 offsetBackingStore.configure(config);
 
 ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
 config, ConnectorClientConfigOverridePolicy.class);
 
-Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
+Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
 WorkerConfigTransformer configTransformer = worker.configTransformer();
 
 Converter internalValueConverter = worker.getInternalValueConverter();
-StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
 statusBackingStore.configure(config);
 
 ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
 internalValueConverter,
 config,
-configTransformer);
+configTransformer,
+kafkaClusterId);

Review comment:
   Removed clusterId from back store constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428203009



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##
@@ -101,24 +101,25 @@ public Connect startConnect(Map 
workerProps) {
 URI advertisedUrl = rest.advertisedUrl();
 String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
 offsetBackingStore.configure(config);
 
 ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
 config, ConnectorClientConfigOverridePolicy.class);
 
-Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
+Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);
 WorkerConfigTransformer configTransformer = worker.configTransformer();
 
 Converter internalValueConverter = worker.getInternalValueConverter();
-StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);

Review comment:
   Removed clusterId from KafkaStatusBackingStore constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428202532



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##
@@ -101,24 +101,25 @@ public Connect startConnect(Map 
workerProps) {
 URI advertisedUrl = rest.advertisedUrl();
 String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);

Review comment:
   Removed clusterId from KafkaOffsetBackingStore constructor
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428202742



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
##
@@ -101,24 +101,25 @@ public Connect startConnect(Map 
workerProps) {
 URI advertisedUrl = rest.advertisedUrl();
 String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
 offsetBackingStore.configure(config);
 
 ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
 config, ConnectorClientConfigOverridePolicy.class);
 
-Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
+Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy, kafkaClusterId);

Review comment:
   Removed clusterId from Worker constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428202270



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 plugins.compareAndSwapWithDelegatingLoader();
 DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
 String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
 offsetBackingStore.configure(distributedConfig);
-Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
 WorkerConfigTransformer configTransformer = worker.configTransformer();
 Converter internalValueConverter = worker.getInternalValueConverter();
-StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
 statusBackingStore.configure(distributedConfig);

Review comment:
   Removed clusterId from KafkaStatusBackingStore constructor





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428202016



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 plugins.compareAndSwapWithDelegatingLoader();
 DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
 String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
 offsetBackingStore.configure(distributedConfig);
-Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY, kafkaClusterId);
 WorkerConfigTransformer configTransformer = worker.configTransformer();
 Converter internalValueConverter = worker.getInternalValueConverter();
-StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, kafkaClusterId);
 statusBackingStore.configure(distributedConfig);
 ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
 internalValueConverter,
 distributedConfig,
-configTransformer);
+configTransformer,
+kafkaClusterId);

Review comment:
   Removed clusterId from KafkaConfigBackingStore constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiaodongdu commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-20 Thread GitBox


xiaodongdu commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r428201738



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
##
@@ -233,17 +233,18 @@ private void addHerder(SourceAndTarget sourceAndTarget) {
 plugins.compareAndSwapWithDelegatingLoader();
 DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
 String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
-KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(kafkaClusterId);
 offsetBackingStore.configure(distributedConfig);

Review comment:
   Removed clusterId from KafkaOffsetBackingStore constructor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9859) kafka-streams-application-reset tool doesn't take into account topics generated by KTable foreign key join operation

2020-05-20 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze resolved KAFKA-9859.
--
Fix Version/s: 2.6.0
   Resolution: Fixed

fixed with PR [https://github.com/apache/kafka/pull/8671]

> kafka-streams-application-reset tool doesn't take into account topics 
> generated by KTable foreign key join operation
> 
>
> Key: KAFKA-9859
> URL: https://issues.apache.org/jira/browse/KAFKA-9859
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: newbie, newbie++
> Fix For: 2.6.0
>
>
> Steps to reproduce:
>  * Create Kafka Streams application which uses foreign key join operation 
> (without a Named parameter overload)
>  * Stop Kafka streams application
>  * Perform `kafka-topics-list` and verify that foreign key operation internal 
> topics are generated
>  * Use `kafka-streams-application-reset` to perform the cleanup of your kafka 
> streams application: `kafka-streams-application-reset --application-id 
>  --input-topics  --bootstrap-servers 
>  --to-datetime 2019-04-13T00:00:00.000`
>  * Perform `kafka-topics-list` again, you'll see that topics generated by the 
> foreign key operation are still there.
> [kafka-streams-application-reset|#L679-L680]] uses 
> `-subscription-registration-topic` and `-subscription-response-topic` 
> suffixes to match topics generated by the foreign key operation. While in 
> reality, internal topics are generated in this format:
> {code:java}
> -KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION- number>-topic 
> -KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE- number>-topic{code}
> Please note that this problem only happens when `Named` parameter is not 
> used. When named parameter is used, topics are generated with a same pattern 
> as specified in StreamsResetter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9899) LogCleaner Tries To Clean Single Partition Over 1000x/Minute

2020-05-20 Thread Jeff Nadler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112460#comment-17112460
 ] 

Jeff Nadler commented on KAFKA-9899:


upgraded to 2.5.0, still experiencing this issue

> LogCleaner Tries To Clean Single Partition Over 1000x/Minute
> 
>
> Key: KAFKA-9899
> URL: https://issues.apache.org/jira/browse/KAFKA-9899
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.4.1
> Environment: ubuntu bionic, openjdk11.0.6, kafka 2.4.1
>Reporter: Jeff Nadler
>Priority: Major
> Attachments: CPU-Usage.png
>
>
> I had previously believed this to be the same issue as KAFKA-8764, but I took 
> a closer look when it persisted after upgrading to 2.4.1 and now believe this 
> is a different bug.
> For a topic that is a very low traffic, compact topic the log cleaner will 
> sometimes - for a period of usually 2 hours or longer - get stuck in a loop 
> where it tries to clean the same partition for the same offset range nonstop, 
> and the log cleaner thread consumes 100% of a single core during this time.
> h4. 1396 attempts in a single minute:
>  
> {{root@stage-obs-kafka01:/var/log/kafka# cat log-cleaner.log | grep 22:22: | 
> grep "offset range" | wc -l}}
> {{1396}}
>  
> h4. All 1396 of these are looking at the same partition and same offset range:
> {{[2020-04-21 22:22:59,862] INFO Cleaner 0: Building offset map for log 
> elauneind-firebolt-messages-sfo-0 for 0 segments in offset range [22943108, 
> 22912825). (kafka.log.LogCleaner)}}
>  
> These attempts are separated by on average only 30ms.   This is a small 3 
> node cluster, note that the CPU graph attached is very clearly bimodal for 
> each node:   low when the log cleaner is not "stuck", and much higher when it 
> is.
> Eventually the log cleaner appears to find a segment to clean (because enough 
> traffic has arrived?) and the loop is broken... for a time.   Note that it 
> finds "1 segments" and then finally moves on to check other topic-partitions.
> {{...tens of thousands of this first one then}}
> {{[2020-04-21 20:06:02,531] INFO Cleaner 0: Building offset map for log 
> elauneind-firebolt-messages-sfo-0 for 0 segments in *offset range* [23591841, 
> 23575583). (kafka.log.LogCleaner)}}{{[2020-04-21 20:06:02,567] INFO Cleaner 
> 0: Building offset map for log elauneind-firebolt-messages-sfo-0 for 1 
> segments in *offset range* [23591841, 23621641). 
> (kafka.log.LogCleaner)}}{{[2020-04-21 20:43:04,309] INFO Cleaner 0: Building 
> offset map for log elauneind-firebolt-messages-s2r1-0 for 1 segments in 
> *offset range* [2687968, 2732498). (kafka.log.LogCleaner)}}
>  
> h4. The topic gets about 100 messsages/minute, and it's config is:
> {{Topic: elauneind-firebolt-messages-sfo PartitionCount: 1 ReplicationFactor: 
> 3 Configs: 
> min.insync.replicas=1,cleanup.policy=compact,delete,segment.bytes=10240,retention.ms=90,message.format.version=2.3-IV1,min.compaction.lag.ms=30,min.cleanable.dirty.ratio=0.2,unclean.leader.election.enable=true,retention.bytes=1073741824}}{{
>  Topic: elauneind-firebolt-messages-sfo Partition: 0 Leader: 0 Replicas: 
> 0,2,1 Isr: 0,1,2}}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nizhikov commented on pull request #8695: KAFKA-9320: KIP-573 - Enable TLSv1.3 by default

2020-05-20 Thread GitBox


nizhikov commented on pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#issuecomment-631614084


   @ijuma I can't see downside in forcing usage of the latest TLS version.
   Added this change to PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #8695: KAFKA-9320: KIP-573 - Enable TLSv1.3 by default

2020-05-20 Thread GitBox


nizhikov commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r428173640



##
File path: 
clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
##
@@ -622,6 +622,34 @@ public void testUnsupportedTLSVersion() throws Exception {
 server.verifyAuthenticationMetrics(0, 1);
 }
 
+/**
+ * Tests that connections can be made with TLSv1.2 and custom cipher suite.
+ */
+@Test
+public void testCiphersSuiteForTLSv1_2() throws Exception {
+String node = "0";
+SSLContext context = SSLContext.getInstance(tlsProtocol);
+context.init(null, null, null);
+
+//Note, that only some ciphers works out of the box. Others requires 
additional configuration.
+String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
+
+sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
+sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(",")));

Review comment:
   Tests added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8671: KAFKA-9859 / Add topics generated by KTable FK join to internal topic matching logic

2020-05-20 Thread GitBox


guozhangwang merged pull request #8671:
URL: https://github.com/apache/kafka/pull/8671


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2020-05-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112447#comment-17112447
 ] 

Guozhang Wang commented on KAFKA-6579:
--

[~mikulskibartosz] please feel free to take a stab at it, I can assign the 
ticket to you later if you have a PR ready for reviews.

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >