[GitHub] [kafka] Hangleton commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


Hangleton commented on a change in pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#discussion_r416391699



##
File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
##
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.kafka.common.utils.Time
+import org.slf4j.Logger
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Retrieves Linux /proc/self/io metrics.
+ */
+class LinuxIoMetricsCollector(val procPath: String, val time: Time, val 
logger: Logger) {
+  import LinuxIoMetricsCollector._
+  var lastUpdateMs = -1L
+  var cachedReadBytes = 0L
+  var cachedWriteBytes = 0L
+
+  def readBytes(): Long = this.synchronized {
+val curMs = time.milliseconds()
+if (curMs != lastUpdateMs) {
+  updateValues(curMs)
+}
+cachedReadBytes
+  }
+
+  def writeBytes(): Long = this.synchronized {
+val curMs = time.milliseconds()
+if (curMs != lastUpdateMs) {
+  updateValues(curMs)
+}
+cachedWriteBytes
+  }
+
+  /**
+   * Read /proc/self/io.
+   *
+   * Generally, each line in this file contains a prefix followed by a colon 
and a number.
+   *
+   * For example, it might contain this:
+   * rchar: 4052
+   * wchar: 0
+   * syscr: 13
+   * syscw: 0
+   * read_bytes: 0
+   * write_bytes: 0
+   * cancelled_write_bytes: 0
+   */
+  def updateValues(now: Long): Boolean = this.synchronized {

Review comment:
   Nit: do you think the lock should be hold while reading `/proc`, or 
restricted to the update of `lastUpdateMs`, `cachedReadBytes` and 
`cachedWriteBytes`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-04-28 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094266#comment-17094266
 ] 

David Jacot commented on KAFKA-7965:


I've fixed few issues related to this flaky test:
 * https://issues.apache.org/jira/browse/KAFKA-9844
 * https://issues.apache.org/jira/browse/KAFKA-9885

I am not sure that those two have resolved all the issues related to this one 
but they should drastically improve the situation. Let's see...

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: David Jacot
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] leonardge commented on pull request #8566: Fix minor code issue

2020-04-28 Thread GitBox


leonardge commented on pull request #8566:
URL: https://github.com/apache/kafka/pull/8566#issuecomment-620445718


   Is this due to flaky tests? 
   The failed test is:
   kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

2020-04-28 Thread GitBox


mateuszjadczykDna commented on a change in pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#discussion_r416421730



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the standby task fail over scenario. Never call commit but process a 
poison key that causes primary task failed.
+ * For at least once, the poison record will be replicated to the standby task 
state.
+ * In EOS, we should not hit the duplicate processing exception on the poison 
key, as the
+ * restore consumer is also read committed.
+ */
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class StandbyTaskFailOverIntegrationTest {
+
+private final Logger log = 
LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class);
+
+private static final int NUM_BROKERS = 3;
+private static final Duration RETENTION = Duration.ofMillis(100_000);
+private static final Duration WINDOW_SIZE = Duration.ofMillis(100);
+private static final String STORE_NAME = "dedup-store";
+
+private final String appId = "test-app";
+private final String inputTopic = "input";
+private final String keyOne = "key_one";
+private final String poisonKey = "poison_key";
+private final int numThreads = 2;
+private KafkaStreams streamInstanceOne;
+private KafkaStreams streamInstanceTwo;
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
+NUM_BROKERS,
+
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"false"))
+);
+
+@Parameterized.Parameter
+public String eosConfig;
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new String[][] {
+{StreamsConfig.AT_LEAST_ONCE},
+

[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

2020-04-28 Thread GitBox


mateuszjadczykDna commented on a change in pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#discussion_r416422911



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the standby task fail over scenario. Never call commit but process a 
poison key that causes primary task failed.
+ * For at least once, the poison record will be replicated to the standby task 
state.
+ * In EOS, we should not hit the duplicate processing exception on the poison 
key, as the
+ * restore consumer is also read committed.
+ */
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class StandbyTaskFailOverIntegrationTest {
+
+private final Logger log = 
LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class);
+
+private static final int NUM_BROKERS = 3;
+private static final Duration RETENTION = Duration.ofMillis(100_000);
+private static final Duration WINDOW_SIZE = Duration.ofMillis(100);
+private static final String STORE_NAME = "dedup-store";
+
+private final String appId = "test-app";
+private final String inputTopic = "input";
+private final String keyOne = "key_one";
+private final String poisonKey = "poison_key";
+private final int numThreads = 2;
+private KafkaStreams streamInstanceOne;
+private KafkaStreams streamInstanceTwo;
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
+NUM_BROKERS,
+
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"false"))
+);
+
+@Parameterized.Parameter
+public String eosConfig;
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new String[][] {
+{StreamsConfig.AT_LEAST_ONCE},
+

[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

2020-04-28 Thread GitBox


mateuszjadczykDna commented on a change in pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#discussion_r416423660



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the standby task fail over scenario. Never call commit but process a 
poison key that causes primary task failed.
+ * For at least once, the poison record will be replicated to the standby task 
state.
+ * In EOS, we should not hit the duplicate processing exception on the poison 
key, as the
+ * restore consumer is also read committed.
+ */
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class StandbyTaskFailOverIntegrationTest {
+
+private final Logger log = 
LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class);
+
+private static final int NUM_BROKERS = 3;
+private static final Duration RETENTION = Duration.ofMillis(100_000);
+private static final Duration WINDOW_SIZE = Duration.ofMillis(100);
+private static final String STORE_NAME = "dedup-store";
+
+private final String appId = "test-app";
+private final String inputTopic = "input";
+private final String keyOne = "key_one";
+private final String poisonKey = "poison_key";
+private final int numThreads = 2;
+private KafkaStreams streamInstanceOne;
+private KafkaStreams streamInstanceTwo;
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
+NUM_BROKERS,
+
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"false"))
+);
+
+@Parameterized.Parameter
+public String eosConfig;
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new String[][] {
+{StreamsConfig.AT_LEAST_ONCE},
+

[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover

2020-04-28 Thread GitBox


mateuszjadczykDna commented on a change in pull request #8553:
URL: https://github.com/apache/kafka/pull/8553#discussion_r416423943



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java
##
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StoreQueryParameters;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.fail;
+
+/**
+ * Test the standby task fail over scenario. Never call commit but process a 
poison key that causes primary task failed.
+ * For at least once, the poison record will be replicated to the standby task 
state.
+ * In EOS, we should not hit the duplicate processing exception on the poison 
key, as the
+ * restore consumer is also read committed.
+ */
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class StandbyTaskFailOverIntegrationTest {
+
+private final Logger log = 
LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class);
+
+private static final int NUM_BROKERS = 3;
+private static final Duration RETENTION = Duration.ofMillis(100_000);
+private static final Duration WINDOW_SIZE = Duration.ofMillis(100);
+private static final String STORE_NAME = "dedup-store";
+
+private final String appId = "test-app";
+private final String inputTopic = "input";
+private final String keyOne = "key_one";
+private final String poisonKey = "poison_key";
+private final int numThreads = 2;
+private KafkaStreams streamInstanceOne;
+private KafkaStreams streamInstanceTwo;
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(
+NUM_BROKERS,
+
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"false"))
+);
+
+@Parameterized.Parameter
+public String eosConfig;
+
+@Parameterized.Parameters(name = "{0}")
+public static Collection data() {
+return Arrays.asList(new String[][] {
+{StreamsConfig.AT_LEAST_ONCE},
+

[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-04-28 Thread GitBox


bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r416436144



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
 Checkpoint.unwrapGroup(record.sourcePartition()),
 System.currentTimeMillis() - record.timestamp());
 }
+
+private void refreshIdleConsumerGroupOffset() {
+Map> consumerGroupsDesc 
= targetAdminClient
+.describeConsumerGroups(consumerGroups).describedGroups();
+
+for (String group : consumerGroups) {
+try {
+if (consumerGroupsDesc.get(group) == null) {
+// if consumerGroupsDesc does not contain this group, it 
should be the new consumer
+// group created at source cluster and its offsets should 
be sync-ed to target
+newConsumerGroup.add(group);
+continue;
+}
+ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+// sync offset to the target cluster only if the state of 
current consumer group is idle or dead
+ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || 
consumerGroupState.equals(ConsumerGroupState.DEAD)) {
+idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+.partitionsToOffsetAndMetadata().get().entrySet());
+}
+} catch (InterruptedException | ExecutionException e) {
+log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+}
+}
+}
+
+Map> syncGroupOffset() {
+Map> offsetToSyncAll = 
new HashMap<>();
+
+// first, sync offsets for the idle consumers at target
+for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) {
+String consumerGroupId = group.getKey();
+// for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+// from the pre-populated map
+Map convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+if (convertedUpstreamOffset == null) continue;
+
+Map offsetToSync = new 
HashMap<>();
+for (Entry entry : 
group.getValue()) {

Review comment:
   @thspinto I think i am running into the same issue which you pointed 
here , so the source consumer group has different topic and is active and the 
target consumer group is idle but having different topic, but the code only 
checks for the topics and partitions matching the target site and adds them 
only when the target offset is less than source, it ignores other topics at the 
source





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] leonardge opened a new pull request #8570: Change type to optional in config entry

2020-04-28 Thread GitBox


leonardge opened a new pull request #8570:
URL: https://github.com/apache/kafka/pull/8570


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9924) Add RocksDB Memory Consumption to RocksDB Metrics

2020-04-28 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-9924:


 Summary: Add RocksDB Memory Consumption to RocksDB Metrics 
 Key: KAFKA-9924
 URL: https://issues.apache.org/jira/browse/KAFKA-9924
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


RocksDB's memory consumption should be added to the RocksDB metrics.

RocksDB's memory consumption can be retrieved with the following class:

https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/MemoryUtil.java

The memory consumption metrics should be added on client level and should be 
recorded on INFO level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-04-28 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094313#comment-17094313
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


Thanks for looking into it. I looked into the test (see comments in the PR), 
played a bit with it and enabled some more logging and I may have more insights.
First of all, are you sure your test uses 2.4 clients? We used 2.4.1 clients 
and this broker image confluentinc/cp-zookeeper:5.3.1. I see that you use 
StoreQueryParameters in the code which is not available in Streams 2.4.1 and 
also ProcessorStateManager implementation changed a lot.

I also revised the logs I included in the ticket and may have a new finding. 
The flow is:
 * NODE 1 T-2 has active task 1_2.
 * NODE 3 *T-1* has standy task 1_2.
 * NODE 1 T-2 crashes
 * NODE 3 *T-2* takes over, T-1 (which had a standby task) is assigned other 
task, standby task 1_2 is revoked.
 * NODE 2 T1 has standby task 1_2
 * NODE 3 T-2 crashes
 * NODE 3 T-1 takes over
 * NODE2 T-1 standby task 1_2 is revoked.

 

The crucial takeaway here is that if we focus on strictly NODE 3, we can see 
that the task 1_2 was not taken over by a thread T-1 with standby task, but 
rather T-2. I guess that's how this version of TaskAssignor works. Digging 
deeper I checked what exactly happened when standby task was revoked on T-1, 
and active task was starting on T-2.
So this is T-1 having standby task revoked:
{noformat}
NODE_3 2020-04-15 21:11:47.024  INFO 1 --- [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread : stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
State transition from RUNNING to PARTITIONS_ASSIGNED
NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.AssignedStandbyTasks : stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
Closing revoked standby tasks {1_2=[mnl..command-2, .command-2]}
NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.processor.internals.StandbyTask  : stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Closing
NODE_3 2020-04-15 21:11:47.027 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.processor.internals.StandbyTask  : stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Committing
NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager: stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Flushing all stores registered in the state manager
NODE_3 2020-04-15 21:11:47.032 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager: stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Flushing store COMMAND_ID_STORE
NODE_3 2020-04-15 21:11:47.194 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager: stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Flushing store _STATE_STORE
NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager: stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Checkpointable offsets updated with restored offsets: 
{CommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
CommandProcessor-_STATE_STORE-changelog-2=1}
NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager: stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Checkpointable offsets updated with active acked offsets: 
{CommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
CommandProcessor-_STATE_STORE-changelog-2=1}
NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager: stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Writing checkpoint: 
{CommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
CommandProcessor-_STATE_STORE-changelog-2=1}
NODE_3 2020-04-15 21:11:47.296 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.processor.internals.StandbyTask  : stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Closing state manager
NODE_3 2020-04-15 21:11:47.296 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager: stream-thread 
[CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Closing its state manager and all the registered state stores
NODE_3 2020-04-15 21:11:47.298 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager: stream-thread 

[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

2020-04-28 Thread GitBox


tombentley commented on pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#issuecomment-620478691


   Rebased for conflict.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8204: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


tombentley commented on pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#issuecomment-620481500


   @kkonstantine any change of merging this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #8571: KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing warnings

2020-04-28 Thread GitBox


tombentley opened a new pull request #8571:
URL: https://github.com/apache/kafka/pull/8571


   Fix all existing javac warnings about use of raw types in Kafka Connect add 
-Xlint:rawtypes to the connect the javac options in build.gradle. This 
addresses part of KAFKA-7613, but further work will be needed for the other 
warnings.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8571: KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing warnings

2020-04-28 Thread GitBox


tombentley commented on pull request #8571:
URL: https://github.com/apache/kafka/pull/8571#issuecomment-620496101


   @ijuma and @ewencp could one of you take a look at this? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9925) Non-key KTable Joining result in duplicate schema name in confluence schema registry

2020-04-28 Thread Kin Siu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kin Siu updated KAFKA-9925:
---
Affects Version/s: 2.4.1

> Non-key KTable Joining result in duplicate schema name in confluence schema 
> registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9925) Non-key KTable Joining result in duplicate schema name in confluence schema registry

2020-04-28 Thread Kin Siu (Jira)
Kin Siu created KAFKA-9925:
--

 Summary: Non-key KTable Joining result in duplicate schema name in 
confluence schema registry
 Key: KAFKA-9925
 URL: https://issues.apache.org/jira/browse/KAFKA-9925
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Kin Siu


The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
exist.

When testing non-key join method without passing in "Named", I noticed that 
there are schema subjects registered in confluent schema registry without 
consumer group Id still, 
e.g. 
{noformat}
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
{noformat}
Code in KTableImpl which constructed above naming :
https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959

When we have multiple topologies using foreignKey join and registered to same 
schema registry, we can have a name clash, and fail to register schema. 

In order to clean up these schema subjects, we will need to know the internal 
naming of a consumer group's topology, which is not straightforward and error 
prone.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread Kin Siu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kin Siu updated KAFKA-9925:
---
Summary: Non-key KTable Joining may result in duplicate schema name in 
confluence schema registry  (was: Non-key KTable Joining result in duplicate 
schema name in confluence schema registry)

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo opened a new pull request #8572: fix (docs): from p to p-1 numbered partitions

2020-04-28 Thread GitBox


jeqo opened a new pull request #8572:
URL: https://github.com/apache/kafka/pull/8572


   nit. Protocol docs describe partition numbers as follows:
   
   > Topic partitions themselves are just ordered "commit logs" numbered 0, 1, 
..., P.
   
   I assume this is a typo, as partition numbers start from 0 up to P-1.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scoopex opened a new pull request #8573: Add posibility to append parameters for tool execution

2020-04-28 Thread GitBox


scoopex opened a new pull request #8573:
URL: https://github.com/apache/kafka/pull/8573


   Provides the possibility to append connection infos to all executed commands:
   ```
   $ export KAFKA_OPTS="--bootstrap-server 
10.1.1.1:9092,10.1.1.1:9092,10.1.1.1:9092"
   kafka-topics.sh --create --topic eventlog-global-dev-003 -replication-factor 
3 --partitions 1
   $ kafka-topics.sh --create --topic eventlog-global-dev-005 
-replication-factor 3 --partitions 1
   Adding specified KAFKA_OPTS_APPEND : '--bootstrap-server 
10.1.1.1:9092,10.1.1.1:9092,10.1.1.1:9092'
   ```
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9018) Kafka Connect - throw clearer exceptions on serialisation errors

2020-04-28 Thread Mario Molina (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Molina updated KAFKA-9018:

Affects Version/s: 2.5.0
   2.4.1

> Kafka Connect - throw clearer exceptions on serialisation errors
> 
>
> Key: KAFKA-9018
> URL: https://issues.apache.org/jira/browse/KAFKA-9018
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Robin Moffatt
>Assignee: Mario Molina
>Priority: Minor
>
> When Connect fails on a deserialisation error, it doesn't show if that's the 
> *key or value* that's thrown the error, nor does it give the user any 
> indication of the *topic/partition/offset* of the message. Kafka Connect 
> should be improved to return this information.
> Example message that user will get (in this case caused by reading non-Avro 
> data with the Avro converter)
> {code:java}
> Caused by: org.apache.kafka.connect.errors.DataException: Failed to 
> deserialize data for topic sample_topic to Avro:
>  at 
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>  ... 13 more
>  Caused by: org.apache.kafka.common.errors.SerializationException: Error 
> deserializing Avro message for id -1
>  Caused by: org.apache.kafka.common.errors.SerializationException: Unknown 
> magic byte!{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9018) Kafka Connect - throw clearer exceptions on serialisation errors

2020-04-28 Thread Mario Molina (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Molina updated KAFKA-9018:

Fix Version/s: 2.6.0

> Kafka Connect - throw clearer exceptions on serialisation errors
> 
>
> Key: KAFKA-9018
> URL: https://issues.apache.org/jira/browse/KAFKA-9018
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Robin Moffatt
>Assignee: Mario Molina
>Priority: Minor
> Fix For: 2.6.0
>
>
> When Connect fails on a deserialisation error, it doesn't show if that's the 
> *key or value* that's thrown the error, nor does it give the user any 
> indication of the *topic/partition/offset* of the message. Kafka Connect 
> should be improved to return this information.
> Example message that user will get (in this case caused by reading non-Avro 
> data with the Avro converter)
> {code:java}
> Caused by: org.apache.kafka.connect.errors.DataException: Failed to 
> deserialize data for topic sample_topic to Avro:
>  at 
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>  ... 13 more
>  Caused by: org.apache.kafka.common.errors.SerializationException: Error 
> deserializing Avro message for id -1
>  Caused by: org.apache.kafka.common.errors.SerializationException: Unknown 
> magic byte!{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-9925:
---

Assignee: John Roesler

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094604#comment-17094604
 ] 

John Roesler commented on KAFKA-9925:
-

Ah, right you are. So sorry I overlooked that part of the bug report when I 
submitted my fix for it.

The issue is that these "pseudo topics" are being created the same way that 
real repartition topics get created in the DSL layer, but for real repartition 
topics, we add them to the InternalTopologyBuilder, which later on invokes 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder#decorateTopic
 to add the applicationId prefix. Of course, this will never happen for the 
pseudo-topics, since we don't add them to the InternalTopologyBuilder.

The complication is that we don't know the applicationId until the application 
is started. Currently, both the DSL builder and the runtime are isolated from 
this because the DSL builder only has to register the topic with the 
InternalTopologyBuilder, and then the runtime code only has to deal with 
pre-configured Serdes, which get the pre-decorated topics injected at startup.

I'll submit a PR shortly to fix it.

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on pull request #8566: Fix minor code issue

2020-04-28 Thread GitBox


junrao commented on pull request #8566:
URL: https://github.com/apache/kafka/pull/8566#issuecomment-620701631


   @leonardge : The failed test seems unrelated to the PR. Could you file a 
separate PR to track that?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] steverod commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


steverod commented on pull request #8543:
URL: https://github.com/apache/kafka/pull/8543#issuecomment-620710529


   > @steverod : There seems to be compilation errors in JDK 8 test?
   
   Hi @junrao  -- Yes, there are, but they aren't mine (!!). No, really. 
   
   > @steverod : There seems to be compilation errors in JDK 8 test?
   
   This was pre-existing in 2.4 and is being fixed by 
https://github.com/apache/kafka/pull/8562 (backport of some test fixes).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

2020-04-28 Thread GitBox


vvcephei opened a new pull request #8574:
URL: https://github.com/apache/kafka/pull/8574


   * ensure that pseudo-topics get correctly prefixed with the app id at run 
time
   * update test to expect the app id prefix
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] leonardge commented on pull request #8566: Fix minor code issue

2020-04-28 Thread GitBox


leonardge commented on pull request #8566:
URL: https://github.com/apache/kafka/pull/8566#issuecomment-620716514


   Sure will do.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8562: Test compilation fixes for Scala 2.11

2020-04-28 Thread GitBox


hachikuji commented on a change in pull request #8562:
URL: https://github.com/apache/kafka/pull/8562#discussion_r416760519



##
File path: 
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
##
@@ -243,9 +245,9 @@ class ReplicaAlterLogDirsThreadTest {
   responseCallback = callbackCaptor.capture(),
   isolationLevel = ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED),
   clientMetadata = ArgumentMatchers.eq(None)
-)).thenAnswer(_ => {
-  callbackCaptor.getValue.apply(Seq((topicPartition, responseData)))
-})
+)) thenAnswer new Answer[Unit] {

Review comment:
   nit: inline usage like this is generally reserved for operators like `+`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-28 Thread GitBox


cadonna commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416761719



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {
-private long acceptableRecoveryLag = 100L;
-private int balanceFactor = 1;
-private int maxWarmupReplicas = 2;
-private int numStandbyReplicas = 0;
-private long probingRebalanceInterval = 60 * 1000L;
-
-private Map clientStates = new HashMap<>();
-private Set allTasks = new HashSet<>();
-private Set statefulTasks = new HashSet<>();
-
-private ClientState client1;
-private ClientState client2;
-private ClientState client3;
-
-private HighAvailabilityTaskAssignor taskAssignor;
-
-private void createTaskAssignor() {
-final AssignmentConfigs configs = new AssignmentConfigs(
-acceptableRecoveryLag,
-balanceFactor,
-maxWarmupReplicas,
-numStandbyReplicas,
-probingRebalanceInterval
-);
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-configs);
-}
+private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 0,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
+
+private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 1,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
 
-@Test
-public void 
shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
-client1 = EasyMock.createNiceMock(ClientState.class);
-expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
-expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
-replay(client1);
-allTasks =  mkSet(TASK_0_0, TASK_0_1);
-clientStates = singletonMap(UUID_1, client1);
-createTaskAssignor();
 
-assertFalse(taskAssignor.previousAssignmentIsValid());

Review comment:
   req: I think, you can now restrict access to 
`previousAssignmentIsValid()` to `private`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8562: Test compilation fixes for Scala 2.11

2020-04-28 Thread GitBox


junrao commented on a change in pull request #8562:
URL: https://github.com/apache/kafka/pull/8562#discussion_r416760588



##
File path: 
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
##
@@ -243,9 +245,9 @@ class ReplicaAlterLogDirsThreadTest {
   responseCallback = callbackCaptor.capture(),
   isolationLevel = ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED),
   clientMetadata = ArgumentMatchers.eq(None)
-)).thenAnswer(_ => {
-  callbackCaptor.getValue.apply(Seq((topicPartition, responseData)))
-})
+)) thenAnswer new Answer[Unit] {

Review comment:
   Should we keep the` .` before `thenAnswer` and use `[]`? The existing 
convention is the following.
   
   ```
 when(...)
 .thenAnswer(new Answer[Unit] {
 override def answer(invocation: InvocationOnMock): Unit = {
...
 }
   })
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#discussion_r416764158



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##
@@ -76,6 +78,10 @@ public void setIfUnset(final Serializer 
defaultSerializer) {
 throw new UnsupportedVersionException("SubscriptionWrapper 
version is larger than maximum supported 0x7F");
 }
 
+if (primaryKeySerializationPseudoTopic == null) {
+primaryKeySerializationPseudoTopic = 
primaryKeySerializationPseudoTopicSupplier.get();
+}

Review comment:
   This (and below) is a bit awkward.
   
   Our requirement is not to call the supplier until after the app starts, but 
we can call it any time after the app starts.
   
   The natural place would be in `configure`, but unfortunately, that method is 
basically useless for our internal serdes. The reason is that we previously 
decided that `configure` should be called externally to the DSL, but our 
internal serdes are constructed _internal_ to the DSL. Plus, `configure` must 
be called at run time (when the config is available), but by run time, we can 
no longer tell whether our serde is "internal" or not. So, there's no good 
place where we can call `configure` for our internal serdes.
   
   I'm side-stepping the problem here by just invoking the supplier when we 
first need to use it, which is also at run time.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -972,13 +974,26 @@ boolean sendingOldValueEnabled() {
 //This occurs whenever the extracted foreignKey changes values.
 enableSendingOldValues();
 
+final NamedInternal renamed = new NamedInternal(joinName);
+
+final String subscriptionTopicName = renamed.suffixWithOrElseGet(
+"-subscription-registration",
+builder,
+SUBSCRIPTION_REGISTRATION
+) + TOPIC_SUFFIX;
 
+// the decoration can't be performed until we have the configuration 
available when the app runs,
+// so we pass Suppliers into the components, which they can call at 
run time

Review comment:
   Hopefully, this explains what's going on here.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -1224,6 +1224,10 @@ private static Pattern buildPattern(final 
Collection sourceTopics,
 return decoratedTopics;
 }
 
+public String decoratePseudoTopic(final String topic) {

Review comment:
   I'm adding a new public method for our specific use case here, to 
document that we should _only_ need to invoke this method publicly for "pseudo" 
topics.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
##
@@ -218,19 +220,19 @@ public void shouldUseExpectedTopicsWithSerde() {
 }
 // verifying primarily that no extra pseudo-topics were used, but it's 
nice to also verify the rest of the
 // topics our serdes serialize data for
-assertThat(serdeScope.registeredTopics(), CoreMatchers.is(mkSet(
+assertThat(serdeScope.registeredTopics(), is(mkSet(
 // expected pseudo-topics
-
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-fk--key",
-
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-pk--key",
-
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-vh--value",
+applicationId + 
"-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-fk--key",
+applicationId + 
"-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-pk--key",
+applicationId + 
"-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-vh--value",

Review comment:
   This verifies the fix: the pseudo topics should also be prefixed. I 
should have noticed before that they weren't.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9926) Flaky test PlaintextAdminIntegrationTest.testCreatePartitions

2020-04-28 Thread Wang Ge (Jira)
Wang Ge created KAFKA-9926:
--

 Summary: Flaky test 
PlaintextAdminIntegrationTest.testCreatePartitions
 Key: KAFKA-9926
 URL: https://issues.apache.org/jira/browse/KAFKA-9926
 Project: Kafka
  Issue Type: Bug
Reporter: Wang Ge


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6007/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/testCreatePartitions/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9927) Add support for varint types to message generator

2020-04-28 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9927:
--

 Summary: Add support for varint types to message generator
 Key: KAFKA-9927
 URL: https://issues.apache.org/jira/browse/KAFKA-9927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


It would be nice to be able to use either a "varint32" or "varint64" type or to 
add a flag to indicate variable length encoding.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] leonardge commented on pull request #8566: Fix minor code issue

2020-04-28 Thread GitBox


leonardge commented on pull request #8566:
URL: https://github.com/apache/kafka/pull/8566#issuecomment-620735042


   JIRA [here](https://issues.apache.org/jira/browse/KAFKA-9926).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9926) Flaky test PlaintextAdminIntegrationTest.testCreatePartitions

2020-04-28 Thread Wang Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Ge updated KAFKA-9926:
---
Description: 
Flaky test: kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6007/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/testCreatePartitions/]

  
was:[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6007/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/testCreatePartitions/]


> Flaky test PlaintextAdminIntegrationTest.testCreatePartitions
> -
>
> Key: KAFKA-9926
> URL: https://issues.apache.org/jira/browse/KAFKA-9926
> Project: Kafka
>  Issue Type: Bug
>Reporter: Wang Ge
>Priority: Major
>
> Flaky test: kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6007/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/testCreatePartitions/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] pan3793 opened a new pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-04-28 Thread GitBox


pan3793 opened a new pull request #8575:
URL: https://github.com/apache/kafka/pull/8575


   https://issues.apache.org/jira/browse/KAFKA-8713
   
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-581:+Value+of+optional+null+field+which+has+default+value
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] pan3793 commented on pull request #7112: KAFKA-8713: JsonConverter NULL Values are replaced by default values even in NULLABLE fields

2020-04-28 Thread GitBox


pan3793 commented on pull request #7112:
URL: https://github.com/apache/kafka/pull/7112#issuecomment-620746182


   I do a new implement at https://github.com/apache/kafka/pull/8575 follow the 
[KIP-581](https://cwiki.apache.org/confluence/display/KAFKA/KIP-581:+Value+of+optional+null+field+which+has+default+value),
 this PR is deprecated, will close it soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11

2020-04-28 Thread GitBox


hachikuji commented on pull request #8562:
URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749823


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11

2020-04-28 Thread GitBox


hachikuji commented on pull request #8562:
URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749686


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11

2020-04-28 Thread GitBox


hachikuji commented on pull request #8562:
URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749977


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog

2020-04-28 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9923:
---
Fix Version/s: 2.6.0

> Join window store duplicates can be compacted in changelog 
> ---
>
> Key: KAFKA-9923
> URL: https://issues.apache.org/jira/browse/KAFKA-9923
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Critical
> Fix For: 2.6.0
>
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> This wrapping occurs at the innermost layer of the store hierarchy, which 
> means the duplicates must first pass through the changelogging layer. At this 
> point the keys are still identical. So, we end up sending the records to the 
> changelog without distinct keys and therefore may lose the older of the 
> duplicates during compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog

2020-04-28 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9923:
---
Priority: Blocker  (was: Critical)

> Join window store duplicates can be compacted in changelog 
> ---
>
> Key: KAFKA-9923
> URL: https://issues.apache.org/jira/browse/KAFKA-9923
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> This wrapping occurs at the innermost layer of the store hierarchy, which 
> means the duplicates must first pass through the changelogging layer. At this 
> point the keys are still identical. So, we end up sending the records to the 
> changelog without distinct keys and therefore may lose the older of the 
> duplicates during compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


ableegoldman commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416809469



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##
@@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws 
Exception {
 // Assert that all messages in the second batch were processed in a 
timely manner
 assertThat(semaphore.tryAcquire(batch2NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));

Review comment:
   `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` still 
failed on [one of the 
builds](https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/)
 at this line :/
   But, at least we got farther into the test before it failed so I'd say this 
is still an improvement 😄 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-28 Thread GitBox


ableegoldman commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416810713



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {
-private long acceptableRecoveryLag = 100L;
-private int balanceFactor = 1;
-private int maxWarmupReplicas = 2;
-private int numStandbyReplicas = 0;
-private long probingRebalanceInterval = 60 * 1000L;
-
-private Map clientStates = new HashMap<>();
-private Set allTasks = new HashSet<>();
-private Set statefulTasks = new HashSet<>();
-
-private ClientState client1;
-private ClientState client2;
-private ClientState client3;
-
-private HighAvailabilityTaskAssignor taskAssignor;
-
-private void createTaskAssignor() {
-final AssignmentConfigs configs = new AssignmentConfigs(
-acceptableRecoveryLag,
-balanceFactor,
-maxWarmupReplicas,
-numStandbyReplicas,
-probingRebalanceInterval
-);
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-configs);
-}
+private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 0,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
+
+private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 1,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
 
-@Test
-public void 
shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
-client1 = EasyMock.createNiceMock(ClientState.class);
-expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
-expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
-replay(client1);
-allTasks =  mkSet(TASK_0_0, TASK_0_1);
-clientStates = singletonMap(UUID_1, client1);
-createTaskAssignor();
 
-assertFalse(taskAssignor.previousAssignmentIsValid());

Review comment:
   Or just remove it completely 😉 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094729#comment-17094729
 ] 

John Roesler commented on KAFKA-9925:
-

Ok, I've opened [https://github.com/apache/kafka/pull/8574] . If you have the 
time, a review would help speed things along. Thanks for the report!

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


ableegoldman commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416814163



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
##
@@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws 
Exception {
 
 produceGlobalTableValues();
 
-final ReadOnlyKeyValueStore replicatedStore =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore replicatedStore = 
IntegrationTestUtils
+.getStore(globalStore, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+assertNotNull(replicatedStore);

Review comment:
   Why do we have to check for null now?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8564: KAFKA-9921: disable caching on stores configured to retain duplicates

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8564:
URL: https://github.com/apache/kafka/pull/8564#issuecomment-620783397


   Also cherry-picked to 2.5.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094772#comment-17094772
 ] 

Guozhang Wang commented on KAFKA-9925:
--

[~vvcephei] Thanks for getting a look into this issue.

I'm wondering if now is a good time to deprecate the `StreamsBuilder#build()` 
function to let users use `build(final Properties props)` instead as a tiny 
KIP. There's risk of course that the props passed in `build` is not the same as 
the one passed into the `KafkaStreams` constructor. I think we can remember the 
reference of the Props when building the topology, and then at construction if 
we found they are not the same (by reference), we can log a warning such that 
"found the topology is built with some StreamsConfig already, which is not the 
same as the config passed in the constructor".

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#issuecomment-620787049


   cc @abbccdda @mjsax to take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9916) Materialize Table-Table Join Result to Avoid Performing Same Join Twice

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094777#comment-17094777
 ] 

Matthias J. Sax commented on KAFKA-9916:


The original example was slightly different:
{code:java}
KStream stream = ...
stream.filter((k,v) -> { v.setA("a"); return true; });
stream.filter((k,v) -> ...);{code}
For this case, the filters are not chained but executed in parallel, what 
basically is a broadcast pattern, ie, each record of `stream` is piped into 
both filters; conceptually, we would need the duplicate the input record, 
however as an optimization, we don't copy by only pass the same object twice.

> Materialize Table-Table Join Result to Avoid Performing Same Join Twice
> ---
>
> Key: KAFKA-9916
> URL: https://issues.apache.org/jira/browse/KAFKA-9916
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Major
>
> If a table-table join processor performs a join and the join needs to forward 
> downstream the old join result (e.g. due to an aggregation operation 
> downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice.
> Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} 
> with the same keys and input into the join operation in this order, the join 
> processor at some point will join {{L1}} with {{R1}}. When the new right 
> value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again 
> {{L1}} with {{R1}}.
> We could avoid calling the {{ValueJoiner}} twice by materializing the join 
> result. We would trade a call to the {{ValueJoiner}} with a lookup into a 
> state store. Depending on the logic in the {{ValueJoiner}} this may or may 
> not improve the performance. However, calling the {{ValueJoiner}} once will 
> only access the input values of the {{ValueJoiner}} once, which avoids the 
> need to copy the input values each time the {{ValueJoiner}} is called. For 
> example, consider the following {{ValueJoiner}}:
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
> leftValue.setSomeValue(rightValue);
> return leftValue;
> }
> {code}
> With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice 
> when {{R2}} trigger the join, the first time with {{R2}} and the second time 
> with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is 
> probably not what the users want. To get the correct result,  the 
> {{ValueJoiner}} should be implemented as follows:
>   
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
> ComplexValue copy = copy(leftValue);
> copy.setSomeValue(rightValue);
> return copy;
> }
> {code}
> Copying values during joins could be avoided if the join result were 
> materialized. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094781#comment-17094781
 ] 

Matthias J. Sax commented on KAFKA-7317:


As reported on SO, when setting number of threads to zero, the client state 
never goes to RUNNING. Sound like another bug?

> Use collections subscription for main consumer to reduce metadata
> -
>
> Key: KAFKA-7317
> URL: https://issues.apache.org/jira/browse/KAFKA-7317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.5.0
>
>
> In KAFKA-4633 we switched from "collection subscription" to "pattern 
> subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
> creating on the broker. In KAFKA-5291, the metadata request was extended to 
> overwrite the broker config within the request itself. However, this feature 
> is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the 
> consumer client, too.
> This ticket proposes to use the new feature within Kafka Streams to allow the 
> usage of collection based subscription in consumer and admit clients to 
> reduce the metadata response size than can be very large for large number of 
> partitions in the cluster.
> Note, that Streams need to be able to distinguish if it connects to older 
> brokers that do not support the new metadata request and still use pattern 
> subscription for this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9127) Needless group coordination overhead for GlobalKTables

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092826#comment-17092826
 ] 

Matthias J. Sax edited comment on KAFKA-9127 at 4/28/20, 6:54 PM:
--

[~ableegoldman] Seems we introduced a regression in 2.5.0 via KAFKA-7317 that 
would be fixed with this ticket (cf. 
[https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic]).
 If you agree, we should cherry-pick the fix to 2.5 branch. And also add a 
corresponding test.

Thoughts?


was (Author: mjsax):
[~ableegoldman] Seems we introduced a regression in 2.5.0 via KAFKA-7317 that 
would be fixed with this ticket (cf. 
[https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic]).
 If you agree, we should cherry-pick the fix ti 2.5 branch. And also add a 
corresponding test.

Thoughts?

> Needless group coordination overhead for GlobalKTables
> --
>
> Key: KAFKA-9127
> URL: https://issues.apache.org/jira/browse/KAFKA-9127
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Chris Toomey
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When creating a simple stream topology to just populate a GlobalKTable, I 
> noticed from logging that the stream consumer was doing group coordination 
> requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to 
> do since the global consumer thread populating the table fetches from all 
> partitions and thus doesn't use the group requests. So this adds needless 
> overhead on the client, network, and server.
> I tracked this down to the stream thread consumer, which is created 
> regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG 
> which defaults to 1 I guess.
> I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from 
> happening, but it'd be a worthwhile improvement to be able to override this 
> setting in cases of topologies like this that don't have any need for stream 
> threads. Hence this ticket.
> I originally asked about this on the users mailing list where Bruno suggested 
> I file it as an improvement request.
> Here's the Scala code that I'm using that exhibits this:
> {code:scala}
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start(){code}
>  Not shown is the state store that I'm populating/using.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9127) Needless group coordination overhead for GlobalKTables

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094791#comment-17094791
 ] 

Matthias J. Sax commented on KAFKA-9127:


{quote}Does it qualify as a regression when the workaround is the same as the 
fix?
{quote}
IMHO, it does, because if you don't change any code/configs and upgrade to 2.5 
it breaks.

Btw: setting the number of threads to zero exposes a different bug: the client 
does not transit to state RUNNING

> Needless group coordination overhead for GlobalKTables
> --
>
> Key: KAFKA-9127
> URL: https://issues.apache.org/jira/browse/KAFKA-9127
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Chris Toomey
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When creating a simple stream topology to just populate a GlobalKTable, I 
> noticed from logging that the stream consumer was doing group coordination 
> requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to 
> do since the global consumer thread populating the table fetches from all 
> partitions and thus doesn't use the group requests. So this adds needless 
> overhead on the client, network, and server.
> I tracked this down to the stream thread consumer, which is created 
> regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG 
> which defaults to 1 I guess.
> I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from 
> happening, but it'd be a worthwhile improvement to be able to override this 
> setting in cases of topologies like this that don't have any need for stream 
> threads. Hence this ticket.
> I originally asked about this on the users mailing list where Bruno suggested 
> I file it as an improvement request.
> Here's the Scala code that I'm using that exhibits this:
> {code:scala}
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start(){code}
>  Not shown is the state store that I'm populating/using.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094797#comment-17094797
 ] 

Sophie Blee-Goldman commented on KAFKA-7317:


Also fixed via [https://github.com/apache/kafka/pull/8540]

> Use collections subscription for main consumer to reduce metadata
> -
>
> Key: KAFKA-7317
> URL: https://issues.apache.org/jira/browse/KAFKA-7317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.5.0
>
>
> In KAFKA-4633 we switched from "collection subscription" to "pattern 
> subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
> creating on the broker. In KAFKA-5291, the metadata request was extended to 
> overwrite the broker config within the request itself. However, this feature 
> is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the 
> consumer client, too.
> This ticket proposes to use the new feature within Kafka Streams to allow the 
> usage of collection based subscription in consumer and admit clients to 
> reduce the metadata response size than can be very large for large number of 
> partitions in the cluster.
> Note, that Streams need to be able to distinguish if it connects to older 
> brokers that do not support the new metadata request and still use pattern 
> subscription for this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8204: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#issuecomment-620795402


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094799#comment-17094799
 ] 

Matthias J. Sax commented on KAFKA-9925:


{quote}I'm wondering if now is a good time to deprecate the 
`StreamsBuilder#build()` function to let users use `build(final Properties 
props)` instead as a tiny KIP.
{quote}
Just FYI: this is already proposed in KIP-591.

However, IMHO, we should fix it for older versions, too?

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata

2020-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094800#comment-17094800
 ] 

Matthias J. Sax commented on KAFKA-7317:


Sweet!

> Use collections subscription for main consumer to reduce metadata
> -
>
> Key: KAFKA-7317
> URL: https://issues.apache.org/jira/browse/KAFKA-7317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.5.0
>
>
> In KAFKA-4633 we switched from "collection subscription" to "pattern 
> subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
> creating on the broker. In KAFKA-5291, the metadata request was extended to 
> overwrite the broker config within the request itself. However, this feature 
> is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the 
> consumer client, too.
> This ticket proposes to use the new feature within Kafka Streams to allow the 
> usage of collection based subscription in consumer and admit clients to 
> reduce the metadata response size than can be very large for large number of 
> partitions in the cluster.
> Note, that Streams need to be able to distinguish if it connects to older 
> brokers that do not support the new metadata request and still use pattern 
> subscription for this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9127) Needless group coordination overhead for GlobalKTables

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094801#comment-17094801
 ] 

Sophie Blee-Goldman commented on KAFKA-9127:


Yep, if you can kick off tests on that PR and give it another pass it should 
fix both issues and we can backport it to 2.5

> Needless group coordination overhead for GlobalKTables
> --
>
> Key: KAFKA-9127
> URL: https://issues.apache.org/jira/browse/KAFKA-9127
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Chris Toomey
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> When creating a simple stream topology to just populate a GlobalKTable, I 
> noticed from logging that the stream consumer was doing group coordination 
> requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to 
> do since the global consumer thread populating the table fetches from all 
> partitions and thus doesn't use the group requests. So this adds needless 
> overhead on the client, network, and server.
> I tracked this down to the stream thread consumer, which is created 
> regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG 
> which defaults to 1 I guess.
> I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from 
> happening, but it'd be a worthwhile improvement to be able to override this 
> setting in cases of topologies like this that don't have any need for stream 
> threads. Hence this ticket.
> I originally asked about this on the users mailing list where Bruno suggested 
> I file it as an improvement request.
> Here's the Scala code that I'm using that exhibits this:
> {code:scala}
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start(){code}
>  Not shown is the state store that I'm populating/using.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry

2020-04-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094805#comment-17094805
 ] 

Guozhang Wang commented on KAFKA-9925:
--

Ah yes!! Hope we can get KIP-591 by 2.6 :)

> Non-key KTable Joining may result in duplicate schema name in confluence 
> schema registry
> 
>
> Key: KAFKA-9925
> URL: https://issues.apache.org/jira/browse/KAFKA-9925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Kin Siu
>Assignee: John Roesler
>Priority: Major
>
> The second half of issue Andy Bryant reported in KAFKA-9390 looks like still 
> exist.
> When testing non-key join method without passing in "Named", I noticed that 
> there are schema subjects registered in confluent schema registry without 
> consumer group Id still, 
> e.g. 
> {noformat}
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key",
> "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value"
> {noformat}
> Code in KTableImpl which constructed above naming :
> https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959
> When we have multiple topologies using foreignKey join and registered to same 
> schema registry, we can have a name clash, and fail to register schema. 
> In order to clean up these schema subjects, we will need to know the internal 
> naming of a consumer group's topology, which is not straightforward and error 
> prone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#issuecomment-620803829


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed

2020-04-28 Thread GitBox


kkonstantine commented on a change in pull request #8204:
URL: https://github.com/apache/kafka/pull/8204#discussion_r416866064



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -220,6 +220,8 @@ public void stop() {
 
 workerMetricsGroup.close();
 connectorStatusMetricsGroup.close();
+
+workerConfigTransformer.close();

Review comment:
   Looking at the initialization of `workerConfigTransformer` I see it 
should be made final. 
   
   And then I notice that this is the case for 
`connectorClientConfigOverridePolicy` and all the class members of 
`ConnectorStatusMetricsGroup`. @tombentley do you mind tightening these types 
as well?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
##
@@ -98,4 +101,8 @@ public void onCompletion(Throwable error, Void result) {
 HerderRequest request = worker.herder().restartConnector(ttl, 
connectorName, cb);
 connectorRequests.put(path, request);
 }
+
+public void close() {

Review comment:
   should we also change this class to implement `AutoCloseable`? 
   This can't be used immediately in a try-with-resources clause, but probably 
better to signal the existence of this method at the class level. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-28 Thread GitBox


abbccdda commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r416879514



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   I see, makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094835#comment-17094835
 ] 

Sophie Blee-Goldman commented on KAFKA-9921:


> For 2 puts I would expect 2 entries regardless if they accidentally match

Fair enough. I guess for that reason then caching and inherently incompatible, 
right?

Regarding putting _null_ values, I think the behavior with _retainDuplicates_ 
is as expected. The Streams library uses window stores with duplicates for 
stream-stream joins, for which a null value produces no output and isn't 
considered a tombstone (see [semantics of stream-stream 
joins|https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstream-kstream-join]
 section).

I'm starting to get a better sense of what you're trying to do here, but it 
sounds like the semantics you want might differ slightly from what Streams 
would consider a stream-stream join. Do you explicitly want a windowed join, or 
are you just using the window store because the retention policy will keep 
state from growing without bound? Does your use case require _null_ values to 
be treated as deletes?

By the way, if the built-in stores don't match your requirements exactly you 
can always plug in a custom store. You could even just wrap one of the built-in 
stores to reuse the pieces that work for you, and skip the ones that don't. The 
rocksdb WindowStore is actually just built out of segments of the rocksdb 
KeyValueStore, for example.

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094839#comment-17094839
 ] 

Sophie Blee-Goldman commented on KAFKA-9921:


I take it you're using rocksdb, by the way? If you are (or can) use the 
in-memory stores then storing a list and appending should be pretty fast. On 
that note, I'm actually not sure storing the entire list would be slower than 
storing individual duplicate records even with rocskdb. I actually have a 
suspicious that it might even be faster to store as a list, assuming the number 
and size of duplicates isn't incredibly large (relative to the memtable and 
block size scale)

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094844#comment-17094844
 ] 

Sophie Blee-Goldman commented on KAFKA-9921:


I'm resolving the ticket because the PR to disable caching + duplicates and 
note this in the javadocs was just merged. If you have the chance to take a 
quick look and let me know if there's anything I missed clarifying in the docs, 
I can submit a quick followup PR or review one from you if you have something 
specific in mind

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] gwenshap commented on pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-28 Thread GitBox


gwenshap commented on pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#issuecomment-620826970


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9921:
---
Fix Version/s: 2.5.1

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use 
> _#transformValues_ and state stores.
> So as an impact I can't use caching on my state stores. For others - they'll 
> have incorrect behavior that may take a lot of time to be discovered and even 
> more time to fix the results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]

2020-04-28 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-9928:


 Summary: Flaky 
GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
 Key: KAFKA-9928
 URL: https://issues.apache.org/jira/browse/KAFKA-9928
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


{code}
Stacktrace
java.lang.AssertionError: Condition not met within timeout 3. waiting for 
final values
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
at 
org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178)
{code}

I looked at the below examples:

https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/

https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/

https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/

And also reproduced the flakiness locally after about 180 runs, and the failed 
one did not have any obvious different traces compared with the successful ones.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416910725



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
##
@@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws 
Exception {
 
 produceGlobalTableValues();
 
-final ReadOnlyKeyValueStore replicatedStore =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore replicatedStore = 
IntegrationTestUtils
+.getStore(globalStore, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+assertNotNull(replicatedStore);

Review comment:
   Since previously we would just throw the exception with the un-wrapped 
call, here asserting it is not null is equal to make sure that the store is 
indeed returned.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846265







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846462







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-28 Thread GitBox


vvcephei commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-620847005


   Whew! System tests passed: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-04-28--001.1588064884--ConcurrencyPractitioner--EMIT-ON-CHANGE--ddbf2cf/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#issuecomment-620847306


   I looked at the three failed tests:
   
   * `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` is 
actually due to the issue that https://github.com/apache/kafka/pull/8548 tried 
to fix. Waiting for @vvcephei to review 8548
   * 
`EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]`
 is being looked at by @mjsax as KAFKA-9831
   * 
`GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]`
 is a new issue, I created KAFKA-9928 for this, and my gut feeling is that it 
has the same root cause as KAFKA-9831. (also cc @mjsax )
   
   So I think this PR is good to be merged.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416916295



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {
-private long acceptableRecoveryLag = 100L;
-private int balanceFactor = 1;
-private int maxWarmupReplicas = 2;
-private int numStandbyReplicas = 0;
-private long probingRebalanceInterval = 60 * 1000L;
-
-private Map clientStates = new HashMap<>();
-private Set allTasks = new HashSet<>();
-private Set statefulTasks = new HashSet<>();
-
-private ClientState client1;
-private ClientState client2;
-private ClientState client3;
-
-private HighAvailabilityTaskAssignor taskAssignor;
-
-private void createTaskAssignor() {
-final AssignmentConfigs configs = new AssignmentConfigs(
-acceptableRecoveryLag,
-balanceFactor,
-maxWarmupReplicas,
-numStandbyReplicas,
-probingRebalanceInterval
-);
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-configs);
-}
+private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 0,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
+
+private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 1,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
 
-@Test
-public void 
shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
-client1 = EasyMock.createNiceMock(ClientState.class);
-expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
-expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
-replay(client1);
-allTasks =  mkSet(TASK_0_0, TASK_0_1);
-clientStates = singletonMap(UUID_1, client1);
-createTaskAssignor();
 
-assertFalse(taskAssignor.previousAssignmentIsValid());

Review comment:
   Since you have a follow-on PR that touches this method, I'll leave it 
alone and just proceed to merge. We should consider both of these options in 
the follow-on.
   
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] fantayeneh opened a new pull request #8576: format with correct syntax

2020-04-28 Thread GitBox


fantayeneh opened a new pull request #8576:
URL: https://github.com/apache/kafka/pull/8576


   small change fix string formatting
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]

2020-04-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9928:
--

Assignee: Matthias J. Sax

> Flaky 
> GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
> -
>
> Key: KAFKA-9928
> URL: https://issues.apache.org/jira/browse/KAFKA-9928
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. waiting for 
> final values
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
>   at 
> org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178)
> {code}
> I looked at the below examples:
> https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/
> And also reproduced the flakiness locally after about 180 runs, and the 
> failed one did not have any obvious different traces compared with the 
> successful ones.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]

2020-04-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9928:
---
Component/s: unit tests
 streams

> Flaky 
> GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
> -
>
> Key: KAFKA-9928
> URL: https://issues.apache.org/jira/browse/KAFKA-9928
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. waiting for 
> final values
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
>   at 
> org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178)
> {code}
> I looked at the below examples:
> https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/
> And also reproduced the flakiness locally after about 180 runs, and the 
> failed one did not have any obvious different traces compared with the 
> successful ones.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416921971



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc
 }
 
 private void verifyStateStore(final KafkaStreams streams,
-  final Set> 
expectedStoreContent) {
-ReadOnlyKeyValueStore store = null;
-
-final long maxWaitingTime = System.currentTimeMillis() + 30L;
-while (System.currentTimeMillis() < maxWaitingTime) {
-try {
-store = 
streams.store(StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore()));
-break;
-} catch (final InvalidStateStoreException okJustRetry) {
-try {
-Thread.sleep(5000L);
-} catch (final Exception ignore) { }
-}
-}
-
+  final Set> 
expectedStoreContent) throws InterruptedException {
+final ReadOnlyKeyValueStore store = IntegrationTestUtils
+.getStore(30L, storeName, streams, 
QueryableStoreTypes.keyValueStore());

Review comment:
   ```suggestion
   .getStore(300_000L, storeName, streams, 
QueryableStoreTypes.keyValueStore());
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
##
@@ -337,8 +336,11 @@ public void 
shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
 TestUtils.waitForCondition(
 () -> {
 try {
-final ReadOnlyKeyValueStore store =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore store = 
IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+if (store == null)
+return false;

Review comment:
   not a huge deal, but technically, these should have brackets.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -599,13 +595,6 @@ public static void waitForCompletion(final KafkaStreams 
streams,
 return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, 
expectedRecords, waitTime, false);
 }
 
-public static  List> 
waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,

Review comment:
   thanks for the cleanup

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final 
EmbeddedKafkaCluster cluster, final
  * @param  Key type of the data records
  * @param  Value type of the data records
  */
-@SuppressWarnings("WeakerAccess")
 public static  void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
  final 
Collection> records,
  final 
Properties producerConfig,
  final 
Headers headers,
  final 
Long timestamp,
- final 
boolean enableTransactions)
-throws ExecutionException, InterruptedException {
+ final 
boolean enableTransactions) {
 
 try (final Producer producer = new 
KafkaProducer<>(producerConfig)) {
 if (enableTransactions) {
 producer.initTransactions();
 producer.beginTransaction();
 }
 for (final KeyValue record : records) {
-final Future f = producer.send(
-new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
-f.get();
+producer.send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value, headers));

Review comment:
   I guess the flush at the end makes it synchronous anyway?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9875) Flaky Test SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once]

2020-04-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-9875:
---

Assignee: John Roesler

> Flaky Test 
> SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once]
> --
>
> Key: KAFKA-9875
> URL: https://issues.apache.org/jira/browse/KAFKA-9875
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: John Roesler
>Priority: Major
>  Labels: flaky-test, unit-test
>
> h3. Stacktrace
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:211)
>  at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAllTopicsAndWait(EmbeddedKafkaCluster.java:300)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest(IntegrationTestUtils.java:148)
>  at 
> org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown(SuppressionDurabilityIntegrationTest.java:246)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] fantayeneh opened a new pull request #8577: use appropriate fn for readability. (maybe)

2020-04-28 Thread GitBox


fantayeneh opened a new pull request #8577:
URL: https://github.com/apache/kafka/pull/8577


   using the min, max might make the code a little easier to read. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416942992



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc
 }
 
 private void verifyStateStore(final KafkaStreams streams,
-  final Set> 
expectedStoreContent) {
-ReadOnlyKeyValueStore store = null;
-
-final long maxWaitingTime = System.currentTimeMillis() + 30L;
-while (System.currentTimeMillis() < maxWaitingTime) {
-try {
-store = 
streams.store(StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore()));
-break;
-} catch (final InvalidStateStoreException okJustRetry) {
-try {
-Thread.sleep(5000L);
-} catch (final Exception ignore) { }
-}
-}
-
+  final Set> 
expectedStoreContent) throws InterruptedException {
+final ReadOnlyKeyValueStore store = IntegrationTestUtils
+.getStore(30L, storeName, streams, 
QueryableStoreTypes.keyValueStore());

Review comment:
   Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416943907



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
##
@@ -337,8 +336,11 @@ public void 
shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
 TestUtils.waitForCondition(
 () -> {
 try {
-final ReadOnlyKeyValueStore store =
-
kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, 
QueryableStoreTypes.keyValueStore()));
+final ReadOnlyKeyValueStore store = 
IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+if (store == null)
+return false;

Review comment:
   ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416944392



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final 
EmbeddedKafkaCluster cluster, final
  * @param  Key type of the data records
  * @param  Value type of the data records
  */
-@SuppressWarnings("WeakerAccess")
 public static  void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
  final 
Collection> records,
  final 
Properties producerConfig,
  final 
Headers headers,
  final 
Long timestamp,
- final 
boolean enableTransactions)
-throws ExecutionException, InterruptedException {
+ final 
boolean enableTransactions) {
 
 try (final Producer producer = new 
KafkaProducer<>(producerConfig)) {
 if (enableTransactions) {
 producer.initTransactions();
 producer.beginTransaction();
 }
 for (final KeyValue record : records) {
-final Future f = producer.send(
-new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
-f.get();
+producer.send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value, headers));

Review comment:
   Previously we wait after sending each record, here we only wait once 
after sending all records, so it is more efficient.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416947800



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final 
EmbeddedKafkaCluster cluster, final
  * @param  Key type of the data records
  * @param  Value type of the data records
  */
-@SuppressWarnings("WeakerAccess")
 public static  void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
  final 
Collection> records,
  final 
Properties producerConfig,
  final 
Headers headers,
  final 
Long timestamp,
- final 
boolean enableTransactions)
-throws ExecutionException, InterruptedException {
+ final 
boolean enableTransactions) {
 
 try (final Producer producer = new 
KafkaProducer<>(producerConfig)) {
 if (enableTransactions) {
 producer.initTransactions();
 producer.beginTransaction();
 }
 for (final KeyValue record : records) {
-final Future f = producer.send(
-new ProducerRecord<>(topic, null, timestamp, record.key, 
record.value, headers));
-f.get();
+producer.send(new ProducerRecord<>(topic, null, timestamp, 
record.key, record.value, headers));

Review comment:
   Thanks. That's what I was asking for confirmation on. I realize now the 
structure of my sentence was ambiguous.
   
   I agree that the method contract is that the batch should be synchronously 
produced, not that each record should be synchronously produced, so this change 
looks good to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


junrao commented on pull request #8543:
URL: https://github.com/apache/kafka/pull/8543#issuecomment-620879490


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

2020-04-28 Thread GitBox


guozhangwang commented on pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#issuecomment-620879654


   Merged to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


junrao commented on pull request #8542:
URL: https://github.com/apache/kafka/pull/8542#issuecomment-620880304


   @steverod : Does the JDK 8 and Scala 2.12 tests pass for you locally? Not 
sure why the jenkins test failed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe edited a comment on pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


cmccabe edited a comment on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289


   > In addition to block-level read/write, would there be a benefit to expose 
file system read/write metrics?
   
   It's better to have that discussion on the mailing list.  This PR is just 
about KIP-551.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


cmccabe commented on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289


   > In addition to block-level read/write, would there be a benefit to expose 
file system read/write metrics?
   It's better to have that discussion on the mailing list.  This PR is just 
about KIP-551.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094900#comment-17094900
 ] 

Georgi Petkov commented on KAFKA-9921:
--

[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would make both the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream and I also use 
_WindowStateStore_ only for the retention policy. In fact, due to the lack of 
many examples, I was looking at the stream-stream join implementation to find 
out how to correctly use the _WindowStateStores_. I'm building a library for 
some common yet not trivial at all operations on streams that you may need like 
topological sorting. Therefore I don't know if the user will provide null 
values or not. I was curious about the behavior with null values so I know what 
I'm providing to the user. I've tested it and that's how I found out what is 
the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot between usages) *to me 
it's best to implement just as in the stream-stream join - with duplicates*. 
Still, it was a great discussion and made me more confident in my decisions. 
Thank you for your assistance.

*Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and 
_TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.*

> Caching is not working properly with WindowStateStore when rataining 
> duplicates
> ---
>
> Key: KAFKA-9921
> URL: https://issues.apache.org/jira/browse/KAFKA-9921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Georgi Petkov
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> I'm using the current latest version 2.5.0 but this is not something new.
> I have _WindowStateStore_ configured as following (where _true_ stands for 
> the _retainDuplicates_ paramter):
>  _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, 
> retentionPeriod, windowSize, *true*), keySerde, 
> valueSerde)*.withCachingEnabled()*)_
> If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that 
> order when reading them through the iterator I'll get the values *4, 2, 3, 4*.
>  I've done a bit of investigation myself and the problem is that *the whole 
> caching feature is written without consideration of the case where duplicates 
> are retained*.
> The observed behavior is due to having the last value in the cache (and it 
> can have only one since it's not aware of the retain duplicates option) and 
> it is read first (while skipping the first from the RocksDB iterator even 
> though the values are different). This can be observed (for version 2.5.0) in 
> _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 
> values are read from the RocksDB iterator so they are as expected.
> As I said, the whole feature is not considering the _retainDuplicates_ option 
> so there are other examples of incorrect behavior like in 
> _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you 
> would skip one duplicate entry in the RocksDB iterator for the given key.
> In my use case, I want to persist a list of values for a given key without 
> increasing the complexity to linear for a single event (which would be the 
> case if I was always reading the current list appending one value and writing 
> it ba

[jira] [Commented] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]

2020-04-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094902#comment-17094902
 ] 

Guozhang Wang commented on KAFKA-9928:
--

I found that for the failed run, around the time when the producer of 
{{produceTopicValues(streamTopic);}} around line 172 is being closed, the 
following entries are printed (whereas succeeded runs do not have those), cc 
[~mjsax]:

{code}
[2020-04-28 15:10:58,458] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Fetch offset 9 is out of range for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset 
(org.apache.kafka.clients.consumer.internals.Fetcher:1261)
[2020-04-28 15:10:58,458] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Resetting offset for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:383)
[2020-04-28 15:10:58,459] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Fetch offset 9 is out of range for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset 
(org.apache.kafka.clients.consumer.internals.Fetcher:1261)
[2020-04-28 15:10:58,460] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Resetting offset for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:383)
[2020-04-28 15:10:58,461] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Fetch offset 9 is out of range for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset 
(org.apache.kafka.clients.consumer.internals.Fetcher:1261)
[2020-04-28 15:10:58,461] INFO [Consumer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer,
 
groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_]
 Resetting offset for partition 
stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:383)
[2020-04-28 15:10:58,566] INFO [Producer 
clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-producer,
 
transactionalId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-1]
 Discovered group coordinator localhost:54279 (id: 0 rack: null) 
(org.apache.kafka.clients.producer.internals.TransactionManager:1525)
[2020-04-28 15:11:00,740] INFO [Controller id=0] Processing automatic preferred 
replica leader election (kafka.controller.KafkaController:66)
{code}

Note that this CLUSTER only have one broker.

> Flaky 
> GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
> -
>
> Key: KAFKA-9928
> URL: https://issues.apache.org/jira/browse/KAFKA-9928
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. waiting for 
> final values
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:

[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates

2020-04-28 Thread Georgi Petkov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094900#comment-17094900
 ] 

Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 10:23 PM:
-

[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would both make the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream, support duplicate 
keys in the stream and I also use _WindowStateStore_ only for the retention 
policy. In fact, due to the lack of many examples, I was looking at the 
stream-stream join implementation to find out how to correctly use the 
_WindowStateStores_. I'm building a library for some common yet not trivial at 
all operations on streams that you may need like topological sorting. Therefore 
I don't know if the user will provide null values or not. I was curious about 
the behavior with null values so I know what I'm providing to the user. I've 
tested it and that's how I found out what is the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates count may vary a lot between usages) *to me 
it's best to implement just as in the stream-stream join - with duplicates*. 
Still, it was a great discussion and made me more confident in my decisions. 
Thank you for your assistance.

*Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and 
_TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.*


was (Author: georgi.petkov):
[~ableegoldman]

Yeah, I agree that probably not much can be done in terms of caching (compared 
to the options without _retainDuplicates_).

I totally agree that many of the features like the null value behavior are 
correct and make perfect sense from point of view of the features implemented 
with it. Still, it's strange from the perspective where you use it standalone. 
*1-2 sentences clarifying the behavior with null values in the 
_WindowStateStore_ documentation could definitely help.* In addition, as I said 
if this is the desired behavior *you can easily skip calling RocksDB for null 
values (when using _retainDuplicates)_. This would make both the intention 
clearer and obviously avoid unnecessary calls.*

I do need exactly stream-stream join but without the repartition part. I want 
to get matches when there are new events in whichever stream and I also use 
_WindowStateStore_ only for the retention policy. In fact, due to the lack of 
many examples, I was looking at the stream-stream join implementation to find 
out how to correctly use the _WindowStateStores_. I'm building a library for 
some common yet not trivial at all operations on streams that you may need like 
topological sorting. Therefore I don't know if the user will provide null 
values or not. I was curious about the behavior with null values so I know what 
I'm providing to the user. I've tested it and that's how I found out what is 
the exact behavior.

*I'm not sure that an in-memory or any custom state store will make it.* Yes, 
in-memory will help with the efficient append because it avoids any expensive 
call and serializations/deserializations. Nevertheless, *you will always have 
the serializations/deserializations somewhere and this is the changelog topic 
and there you have also bandwidth* (not just precious processing time). Even if 
the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 
4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - 
O(n^2). Combined with the fact that I want to provide a library to many 
different users (and duplicates coun

[GitHub] [kafka] cmccabe commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics

2020-04-28 Thread GitBox


cmccabe commented on a change in pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#discussion_r416961523



##
File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
##
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.kafka.common.utils.Time
+import org.slf4j.Logger
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Retrieves Linux /proc/self/io metrics.
+ */
+class LinuxIoMetricsCollector(val procPath: String, val time: Time, val 
logger: Logger) {
+  import LinuxIoMetricsCollector._
+  var lastUpdateMs = -1L
+  var cachedReadBytes = 0L
+  var cachedWriteBytes = 0L
+
+  def readBytes(): Long = this.synchronized {
+val curMs = time.milliseconds()
+if (curMs != lastUpdateMs) {
+  updateValues(curMs)
+}
+cachedReadBytes
+  }
+
+  def writeBytes(): Long = this.synchronized {
+val curMs = time.milliseconds()
+if (curMs != lastUpdateMs) {
+  updateValues(curMs)
+}
+cachedWriteBytes
+  }
+
+  /**
+   * Read /proc/self/io.
+   *
+   * Generally, each line in this file contains a prefix followed by a colon 
and a number.
+   *
+   * For example, it might contain this:
+   * rchar: 4052
+   * wchar: 0
+   * syscr: 13
+   * syscw: 0
+   * read_bytes: 0
+   * write_bytes: 0
+   * cancelled_write_bytes: 0
+   */
+  def updateValues(now: Long): Boolean = this.synchronized {

Review comment:
   Unless we choose to read this file in a background thread, there isn't a 
reason to avoid using a lock here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8567: KAFKA-9652: Fix throttle metric in RequestChannel and request log due to KIP-219

2020-04-28 Thread GitBox


ijuma commented on pull request #8567:
URL: https://github.com/apache/kafka/pull/8567#issuecomment-620887867


   2 jobs passed, 1 unrelated flaky test failed:
   
   > 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] steverod commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-28 Thread GitBox


steverod commented on pull request #8542:
URL: https://github.com/apache/kafka/pull/8542#issuecomment-620893016


   retest this please
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei opened a new pull request #8578:
URL: https://github.com/apache/kafka/pull/8578


   The ticket is for a flaky test that failed to clean up topics _after_ the 
test, which
   isn't strictly necessary for test success.
   
   * alter the "clean up after test" method to never throw an exception
 (after verifying it's always the last invocation inside a finally block,
 so it won't break any test semantics)
   * consolidated the naming of all integration tests' app ids, topics, etc., 
by introducing 
 a new test utility to generate safe, unique, descriptive names.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8578:
URL: https://github.com/apache/kafka/pull/8578#discussion_r416969713



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
##
@@ -101,8 +102,8 @@ public void before() throws Exception {
 builder = new StreamsBuilder();
 createTopics();
 streamsConfiguration = new Properties();
-final String applicationId = "global-thread-shutdown-test" + 
testName.getMethodName();
-streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);

Review comment:
   I've standardized all the usages to be just "app", followed by the 
generated name, since the generated name contains the same information that we 
previously hand-wrote into the prefix or suffix. All we really need to do is 
ensure that the app id won't collide with a group name that we might use in a 
verification consumer, for example. For that reason, I've never used the 
generated name "plain", but always scoped it to the usage (app id, group id, 
input topic, etc.).
   
   It's not super important to apply these ideas universally, but I felt it 
would make it easier to write more tests like it in the future if I just made a 
full pass on all the tests to make them all look the same.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -88,16 +89,17 @@
 private String stateStoreName;
 
 @Rule
-public TestName name = new TestName();
+public TestName testName = new TestName();
 
 @Before
 public void before() {
-inputTopicName = "input-topic-" + name.getMethodName();
-outputTopicName = "output-topic-" + name.getMethodName();
-stateStoreName = "lagfetch-test-store" + name.getMethodName();
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+inputTopicName = "input-topic-" + safeTestName;
+outputTopicName = "output-topic-" + safeTestName;
+stateStoreName = "lagfetch-test-store" + safeTestName;
 
 streamsConfiguration = new Properties();
-streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"lag-fetch-" + name.getMethodName());
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"lag-fetch-" + safeTestName);

Review comment:
   ```suggestion
   streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" 
+ safeTestName);
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final 
EmbeddedKafkaCluster cluster, fina
 }
 }
 
-public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, 
final KafkaStreams driver) {
-driver.cleanUp();
+public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster 
cluster, final KafkaStreams driver) {
 try {
+driver.cleanUp();
 cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
+} catch (final RuntimeException | InterruptedException e) {
+LOG.warn("Ignoring failure to clean test state", e);
 }

Review comment:
   This is really the fix for KAFKA-9875. The other change just hopefully 
reduces the probability that ignoring the exceptions could cause subsequent 
failures (e.g., if the topics don't get deleted before the next test, at least 
the next one will have different topic names).
   
   I've verified that all usages of this method are ok to ignore potential 
exceptions. Namely, as long as the test logic itself doesn't want to ensure 
that any topics got deleted, and as long as this method is the last line in the 
method, then it should be fine just to ignore failures here.
   
   I also considered just deleting the method, but if it does succeed, then it 
leaves less garbage around for subsequent tests, so it feels better to at least 
attempt a cleanup.

##
File path: streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
##
@@ -106,7 +106,9 @@ public static void 
startKafkaStreamsAndWaitForRunningState(final KafkaStreams ka
 kafkaStreams.start();
 assertThat(
 "KafkaStreams did not transit to RUNNING state within " + 
timeoutMs + " milli seconds.",
-countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), 
equalTo(true));
+countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS),
+equalTo(true)
+);

Review comment:
   just fixing the formatting.




--

[GitHub] [kafka] ableegoldman commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-28 Thread GitBox


ableegoldman commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620909428


   One unrelated failure: 
`MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9929) Support reverse iterator on WindowStore

2020-04-28 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-9929:
---

 Summary: Support reverse iterator on WindowStore
 Key: KAFKA-9929
 URL: https://issues.apache.org/jira/browse/KAFKA-9929
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Jorge Esteban Quilcate Otoya


Currently, WindowStore fetch operations return an iterator sorted from earliest 
to latest result:

```

* For each key, the iterator guarantees ordering of windows, starting from the 
oldest/earliest

* available window to the newest/latest window.

```

 

We have a use-case where traces are stored in a WindowStore and 
use Kafka Streams to create a materialized view of traces. A query request 
comes with a time range (e.g. now-1h, now) and want to return the most recent 
results, i.e. fetch from this period of time, iterate and pattern match 
latest/most recent traces, and if enough results, then reply without moving 
further on the iterator.

Same store is used to search for previous traces. In this case, it search a key 
for the last day, if found traces, we would also like to iterate from the most 
recent.

RocksDb seems to support iterating backward and forward: 
[https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound]

 

For reference: This in some way extracts some bits from this previous issue: 
https://issues.apache.org/jira/browse/KAFKA-4212:

 

> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp. But 
> this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.

 

Would like to know if there is any impediment on RocksDb or  WindowStore to 
support this.

Adding an argument to reverse in current fetch methods would be great:

```

WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD)

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >