[GitHub] [kafka] Hangleton commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics
Hangleton commented on a change in pull request #8569: URL: https://github.com/apache/kafka/pull/8569#discussion_r416391699 ## File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala ## @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.nio.file.{Files, Paths} + +import org.apache.kafka.common.utils.Time +import org.slf4j.Logger + +import scala.jdk.CollectionConverters._ + +/** + * Retrieves Linux /proc/self/io metrics. + */ +class LinuxIoMetricsCollector(val procPath: String, val time: Time, val logger: Logger) { + import LinuxIoMetricsCollector._ + var lastUpdateMs = -1L + var cachedReadBytes = 0L + var cachedWriteBytes = 0L + + def readBytes(): Long = this.synchronized { +val curMs = time.milliseconds() +if (curMs != lastUpdateMs) { + updateValues(curMs) +} +cachedReadBytes + } + + def writeBytes(): Long = this.synchronized { +val curMs = time.milliseconds() +if (curMs != lastUpdateMs) { + updateValues(curMs) +} +cachedWriteBytes + } + + /** + * Read /proc/self/io. + * + * Generally, each line in this file contains a prefix followed by a colon and a number. + * + * For example, it might contain this: + * rchar: 4052 + * wchar: 0 + * syscr: 13 + * syscw: 0 + * read_bytes: 0 + * write_bytes: 0 + * cancelled_write_bytes: 0 + */ + def updateValues(now: Long): Boolean = this.synchronized { Review comment: Nit: do you think the lock should be hold while reading `/proc`, or restricted to the update of `lastUpdateMs`, `cachedReadBytes` and `cachedWriteBytes`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094266#comment-17094266 ] David Jacot commented on KAFKA-7965: I've fixed few issues related to this flaky test: * https://issues.apache.org/jira/browse/KAFKA-9844 * https://issues.apache.org/jira/browse/KAFKA-9885 I am not sure that those two have resolved all the issues related to this one but they should drastically improve the situation. Let's see... > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 1.1.1, 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: David Jacot >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] leonardge commented on pull request #8566: Fix minor code issue
leonardge commented on pull request #8566: URL: https://github.com/apache/kafka/pull/8566#issuecomment-620445718 Is this due to flaky tests? The failed test is: kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover
mateuszjadczykDna commented on a change in pull request #8553: URL: https://github.com/apache/kafka/pull/8553#discussion_r416421730 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.QueryableStoreType; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.fail; + +/** + * Test the standby task fail over scenario. Never call commit but process a poison key that causes primary task failed. + * For at least once, the poison record will be replicated to the standby task state. + * In EOS, we should not hit the duplicate processing exception on the poison key, as the + * restore consumer is also read committed. + */ +@RunWith(Parameterized.class) +@Category({IntegrationTest.class}) +public class StandbyTaskFailOverIntegrationTest { + +private final Logger log = LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class); + +private static final int NUM_BROKERS = 3; +private static final Duration RETENTION = Duration.ofMillis(100_000); +private static final Duration WINDOW_SIZE = Duration.ofMillis(100); +private static final String STORE_NAME = "dedup-store"; + +private final String appId = "test-app"; +private final String inputTopic = "input"; +private final String keyOne = "key_one"; +private final String poisonKey = "poison_key"; +private final int numThreads = 2; +private KafkaStreams streamInstanceOne; +private KafkaStreams streamInstanceTwo; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( +NUM_BROKERS, + Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false")) +); + +@Parameterized.Parameter +public String eosConfig; + +@Parameterized.Parameters(name = "{0}") +public static Collection data() { +return Arrays.asList(new String[][] { +{StreamsConfig.AT_LEAST_ONCE}, +
[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover
mateuszjadczykDna commented on a change in pull request #8553: URL: https://github.com/apache/kafka/pull/8553#discussion_r416422911 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.QueryableStoreType; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.fail; + +/** + * Test the standby task fail over scenario. Never call commit but process a poison key that causes primary task failed. + * For at least once, the poison record will be replicated to the standby task state. + * In EOS, we should not hit the duplicate processing exception on the poison key, as the + * restore consumer is also read committed. + */ +@RunWith(Parameterized.class) +@Category({IntegrationTest.class}) +public class StandbyTaskFailOverIntegrationTest { + +private final Logger log = LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class); + +private static final int NUM_BROKERS = 3; +private static final Duration RETENTION = Duration.ofMillis(100_000); +private static final Duration WINDOW_SIZE = Duration.ofMillis(100); +private static final String STORE_NAME = "dedup-store"; + +private final String appId = "test-app"; +private final String inputTopic = "input"; +private final String keyOne = "key_one"; +private final String poisonKey = "poison_key"; +private final int numThreads = 2; +private KafkaStreams streamInstanceOne; +private KafkaStreams streamInstanceTwo; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( +NUM_BROKERS, + Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false")) +); + +@Parameterized.Parameter +public String eosConfig; + +@Parameterized.Parameters(name = "{0}") +public static Collection data() { +return Arrays.asList(new String[][] { +{StreamsConfig.AT_LEAST_ONCE}, +
[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover
mateuszjadczykDna commented on a change in pull request #8553: URL: https://github.com/apache/kafka/pull/8553#discussion_r416423660 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.QueryableStoreType; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.fail; + +/** + * Test the standby task fail over scenario. Never call commit but process a poison key that causes primary task failed. + * For at least once, the poison record will be replicated to the standby task state. + * In EOS, we should not hit the duplicate processing exception on the poison key, as the + * restore consumer is also read committed. + */ +@RunWith(Parameterized.class) +@Category({IntegrationTest.class}) +public class StandbyTaskFailOverIntegrationTest { + +private final Logger log = LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class); + +private static final int NUM_BROKERS = 3; +private static final Duration RETENTION = Duration.ofMillis(100_000); +private static final Duration WINDOW_SIZE = Duration.ofMillis(100); +private static final String STORE_NAME = "dedup-store"; + +private final String appId = "test-app"; +private final String inputTopic = "input"; +private final String keyOne = "key_one"; +private final String poisonKey = "poison_key"; +private final int numThreads = 2; +private KafkaStreams streamInstanceOne; +private KafkaStreams streamInstanceTwo; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( +NUM_BROKERS, + Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false")) +); + +@Parameterized.Parameter +public String eosConfig; + +@Parameterized.Parameters(name = "{0}") +public static Collection data() { +return Arrays.asList(new String[][] { +{StreamsConfig.AT_LEAST_ONCE}, +
[GitHub] [kafka] mateuszjadczykDna commented on a change in pull request #8553: KAFKA-9891: Add integration test to replicate standby task failover
mateuszjadczykDna commented on a change in pull request #8553: URL: https://github.com/apache/kafka/pull/8553#discussion_r416423943 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskFailOverIntegrationTest.java ## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.QueryableStoreType; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.test.TestUtils.waitForCondition; +import static org.junit.Assert.fail; + +/** + * Test the standby task fail over scenario. Never call commit but process a poison key that causes primary task failed. + * For at least once, the poison record will be replicated to the standby task state. + * In EOS, we should not hit the duplicate processing exception on the poison key, as the + * restore consumer is also read committed. + */ +@RunWith(Parameterized.class) +@Category({IntegrationTest.class}) +public class StandbyTaskFailOverIntegrationTest { + +private final Logger log = LoggerFactory.getLogger(StandbyTaskFailOverIntegrationTest.class); + +private static final int NUM_BROKERS = 3; +private static final Duration RETENTION = Duration.ofMillis(100_000); +private static final Duration WINDOW_SIZE = Duration.ofMillis(100); +private static final String STORE_NAME = "dedup-store"; + +private final String appId = "test-app"; +private final String inputTopic = "input"; +private final String keyOne = "key_one"; +private final String poisonKey = "poison_key"; +private final int numThreads = 2; +private KafkaStreams streamInstanceOne; +private KafkaStreams streamInstanceTwo; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( +NUM_BROKERS, + Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false")) +); + +@Parameterized.Parameter +public String eosConfig; + +@Parameterized.Parameters(name = "{0}") +public static Collection data() { +return Arrays.asList(new String[][] { +{StreamsConfig.AT_LEAST_ONCE}, +
[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0
bseenu commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r416436144 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + +private void refreshIdleConsumerGroupOffset() { +Map> consumerGroupsDesc = targetAdminClient +.describeConsumerGroups(consumerGroups).describedGroups(); + +for (String group : consumerGroups) { +try { +if (consumerGroupsDesc.get(group) == null) { +// if consumerGroupsDesc does not contain this group, it should be the new consumer +// group created at source cluster and its offsets should be sync-ed to target +newConsumerGroup.add(group); +continue; +} +ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); +// sync offset to the target cluster only if the state of current consumer group is idle or dead +ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); +if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || consumerGroupState.equals(ConsumerGroupState.DEAD)) { +idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) +.partitionsToOffsetAndMetadata().get().entrySet()); +} +} catch (InterruptedException | ExecutionException e) { +log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); +} +} +} + +Map> syncGroupOffset() { +Map> offsetToSyncAll = new HashMap<>(); + +// first, sync offsets for the idle consumers at target +for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) { +String consumerGroupId = group.getKey(); +// for each idle consumer at target, read the checkpoints (converted upstream offset) +// from the pre-populated map +Map convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + +if (convertedUpstreamOffset == null) continue; + +Map offsetToSync = new HashMap<>(); +for (Entry entry : group.getValue()) { Review comment: @thspinto I think i am running into the same issue which you pointed here , so the source consumer group has different topic and is active and the target consumer group is idle but having different topic, but the code only checks for the topics and partitions matching the target site and adds them only when the target offset is less than source, it ignores other topics at the source This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] leonardge opened a new pull request #8570: Change type to optional in config entry
leonardge opened a new pull request #8570: URL: https://github.com/apache/kafka/pull/8570 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9924) Add RocksDB Memory Consumption to RocksDB Metrics
Bruno Cadonna created KAFKA-9924: Summary: Add RocksDB Memory Consumption to RocksDB Metrics Key: KAFKA-9924 URL: https://issues.apache.org/jira/browse/KAFKA-9924 Project: Kafka Issue Type: Improvement Components: streams Reporter: Bruno Cadonna RocksDB's memory consumption should be added to the RocksDB metrics. RocksDB's memory consumption can be retrieved with the following class: https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/MemoryUtil.java The memory consumption metrics should be added on client level and should be recorded on INFO level. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094313#comment-17094313 ] Mateusz Jadczyk commented on KAFKA-9891: Thanks for looking into it. I looked into the test (see comments in the PR), played a bit with it and enabled some more logging and I may have more insights. First of all, are you sure your test uses 2.4 clients? We used 2.4.1 clients and this broker image confluentinc/cp-zookeeper:5.3.1. I see that you use StoreQueryParameters in the code which is not available in Streams 2.4.1 and also ProcessorStateManager implementation changed a lot. I also revised the logs I included in the ticket and may have a new finding. The flow is: * NODE 1 T-2 has active task 1_2. * NODE 3 *T-1* has standy task 1_2. * NODE 1 T-2 crashes * NODE 3 *T-2* takes over, T-1 (which had a standby task) is assigned other task, standby task 1_2 is revoked. * NODE 2 T1 has standby task 1_2 * NODE 3 T-2 crashes * NODE 3 T-1 takes over * NODE2 T-1 standby task 1_2 is revoked. The crucial takeaway here is that if we focus on strictly NODE 3, we can see that the task 1_2 was not taken over by a thread T-1 with standby task, but rather T-2. I guess that's how this version of TaskAssignor works. Digging deeper I checked what exactly happened when standby task was revoked on T-1, and active task was starting on T-2. So this is T-1 having standby task revoked: {noformat} NODE_3 2020-04-15 21:11:47.024 INFO 1 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] o.a.k.s.p.i.AssignedStandbyTasks : stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Closing revoked standby tasks {1_2=[mnl..command-2, .command-2]} NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] o.a.k.s.processor.internals.StandbyTask : stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Closing NODE_3 2020-04-15 21:11:47.027 TRACE 1 --- [-StreamThread-1] o.a.k.s.processor.internals.StandbyTask : stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Committing NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager: stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Flushing all stores registered in the state manager NODE_3 2020-04-15 21:11:47.032 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager: stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Flushing store COMMAND_ID_STORE NODE_3 2020-04-15 21:11:47.194 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager: stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Flushing store _STATE_STORE NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager: stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Checkpointable offsets updated with restored offsets: {CommandProcessor-COMMAND_ID_STORE-changelog-2=1, CommandProcessor-_STATE_STORE-changelog-2=1} NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager: stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Checkpointable offsets updated with active acked offsets: {CommandProcessor-COMMAND_ID_STORE-changelog-2=1, CommandProcessor-_STATE_STORE-changelog-2=1} NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager: stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Writing checkpoint: {CommandProcessor-COMMAND_ID_STORE-changelog-2=1, CommandProcessor-_STATE_STORE-changelog-2=1} NODE_3 2020-04-15 21:11:47.296 TRACE 1 --- [-StreamThread-1] o.a.k.s.processor.internals.StandbyTask : stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Closing state manager NODE_3 2020-04-15 21:11:47.296 DEBUG 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager: stream-thread [CommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Closing its state manager and all the registered state stores NODE_3 2020-04-15 21:11:47.298 DEBUG 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager: stream-thread
[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs
tombentley commented on pull request #8312: URL: https://github.com/apache/kafka/pull/8312#issuecomment-620478691 Rebased for conflict. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8204: Ensure ConfigProviders are closed
tombentley commented on pull request #8204: URL: https://github.com/apache/kafka/pull/8204#issuecomment-620481500 @kkonstantine any change of merging this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley opened a new pull request #8571: KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing warnings
tombentley opened a new pull request #8571: URL: https://github.com/apache/kafka/pull/8571 Fix all existing javac warnings about use of raw types in Kafka Connect add -Xlint:rawtypes to the connect the javac options in build.gradle. This addresses part of KAFKA-7613, but further work will be needed for the other warnings. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8571: KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing warnings
tombentley commented on pull request #8571: URL: https://github.com/apache/kafka/pull/8571#issuecomment-620496101 @ijuma and @ewencp could one of you take a look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9925) Non-key KTable Joining result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kin Siu updated KAFKA-9925: --- Affects Version/s: 2.4.1 > Non-key KTable Joining result in duplicate schema name in confluence schema > registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9925) Non-key KTable Joining result in duplicate schema name in confluence schema registry
Kin Siu created KAFKA-9925: -- Summary: Non-key KTable Joining result in duplicate schema name in confluence schema registry Key: KAFKA-9925 URL: https://issues.apache.org/jira/browse/KAFKA-9925 Project: Kafka Issue Type: Bug Components: streams Reporter: Kin Siu The second half of issue Andy Bryant reported in KAFKA-9390 looks like still exist. When testing non-key join method without passing in "Named", I noticed that there are schema subjects registered in confluent schema registry without consumer group Id still, e.g. {noformat} "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" {noformat} Code in KTableImpl which constructed above naming : https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 When we have multiple topologies using foreignKey join and registered to same schema registry, we can have a name clash, and fail to register schema. In order to clean up these schema subjects, we will need to know the internal naming of a consumer group's topology, which is not straightforward and error prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kin Siu updated KAFKA-9925: --- Summary: Non-key KTable Joining may result in duplicate schema name in confluence schema registry (was: Non-key KTable Joining result in duplicate schema name in confluence schema registry) > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo opened a new pull request #8572: fix (docs): from p to p-1 numbered partitions
jeqo opened a new pull request #8572: URL: https://github.com/apache/kafka/pull/8572 nit. Protocol docs describe partition numbers as follows: > Topic partitions themselves are just ordered "commit logs" numbered 0, 1, ..., P. I assume this is a typo, as partition numbers start from 0 up to P-1. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scoopex opened a new pull request #8573: Add posibility to append parameters for tool execution
scoopex opened a new pull request #8573: URL: https://github.com/apache/kafka/pull/8573 Provides the possibility to append connection infos to all executed commands: ``` $ export KAFKA_OPTS="--bootstrap-server 10.1.1.1:9092,10.1.1.1:9092,10.1.1.1:9092" kafka-topics.sh --create --topic eventlog-global-dev-003 -replication-factor 3 --partitions 1 $ kafka-topics.sh --create --topic eventlog-global-dev-005 -replication-factor 3 --partitions 1 Adding specified KAFKA_OPTS_APPEND : '--bootstrap-server 10.1.1.1:9092,10.1.1.1:9092,10.1.1.1:9092' ``` *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9018) Kafka Connect - throw clearer exceptions on serialisation errors
[ https://issues.apache.org/jira/browse/KAFKA-9018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mario Molina updated KAFKA-9018: Affects Version/s: 2.5.0 2.4.1 > Kafka Connect - throw clearer exceptions on serialisation errors > > > Key: KAFKA-9018 > URL: https://issues.apache.org/jira/browse/KAFKA-9018 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.5.0, 2.4.1 >Reporter: Robin Moffatt >Assignee: Mario Molina >Priority: Minor > > When Connect fails on a deserialisation error, it doesn't show if that's the > *key or value* that's thrown the error, nor does it give the user any > indication of the *topic/partition/offset* of the message. Kafka Connect > should be improved to return this information. > Example message that user will get (in this case caused by reading non-Avro > data with the Avro converter) > {code:java} > Caused by: org.apache.kafka.connect.errors.DataException: Failed to > deserialize data for topic sample_topic to Avro: > at > io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 13 more > Caused by: org.apache.kafka.common.errors.SerializationException: Error > deserializing Avro message for id -1 > Caused by: org.apache.kafka.common.errors.SerializationException: Unknown > magic byte!{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9018) Kafka Connect - throw clearer exceptions on serialisation errors
[ https://issues.apache.org/jira/browse/KAFKA-9018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mario Molina updated KAFKA-9018: Fix Version/s: 2.6.0 > Kafka Connect - throw clearer exceptions on serialisation errors > > > Key: KAFKA-9018 > URL: https://issues.apache.org/jira/browse/KAFKA-9018 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.5.0, 2.4.1 >Reporter: Robin Moffatt >Assignee: Mario Molina >Priority: Minor > Fix For: 2.6.0 > > > When Connect fails on a deserialisation error, it doesn't show if that's the > *key or value* that's thrown the error, nor does it give the user any > indication of the *topic/partition/offset* of the message. Kafka Connect > should be improved to return this information. > Example message that user will get (in this case caused by reading non-Avro > data with the Avro converter) > {code:java} > Caused by: org.apache.kafka.connect.errors.DataException: Failed to > deserialize data for topic sample_topic to Avro: > at > io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 13 more > Caused by: org.apache.kafka.common.errors.SerializationException: Error > deserializing Avro message for id -1 > Caused by: org.apache.kafka.common.errors.SerializationException: Unknown > magic byte!{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-9925: --- Assignee: John Roesler > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094604#comment-17094604 ] John Roesler commented on KAFKA-9925: - Ah, right you are. So sorry I overlooked that part of the bug report when I submitted my fix for it. The issue is that these "pseudo topics" are being created the same way that real repartition topics get created in the DSL layer, but for real repartition topics, we add them to the InternalTopologyBuilder, which later on invokes org.apache.kafka.streams.processor.internals.InternalTopologyBuilder#decorateTopic to add the applicationId prefix. Of course, this will never happen for the pseudo-topics, since we don't add them to the InternalTopologyBuilder. The complication is that we don't know the applicationId until the application is started. Currently, both the DSL builder and the runtime are isolated from this because the DSL builder only has to register the topic with the InternalTopologyBuilder, and then the runtime code only has to deal with pre-configured Serdes, which get the pre-decorated topics injected at startup. I'll submit a PR shortly to fix it. > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on pull request #8566: Fix minor code issue
junrao commented on pull request #8566: URL: https://github.com/apache/kafka/pull/8566#issuecomment-620701631 @leonardge : The failed test seems unrelated to the PR. Could you file a separate PR to track that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] steverod commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
steverod commented on pull request #8543: URL: https://github.com/apache/kafka/pull/8543#issuecomment-620710529 > @steverod : There seems to be compilation errors in JDK 8 test? Hi @junrao -- Yes, there are, but they aren't mine (!!). No, really. > @steverod : There seems to be compilation errors in JDK 8 test? This was pre-existing in 2.4 and is being fixed by https://github.com/apache/kafka/pull/8562 (backport of some test fixes). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #8574: KAFKA-9925: decorate pseudo-topics with app id
vvcephei opened a new pull request #8574: URL: https://github.com/apache/kafka/pull/8574 * ensure that pseudo-topics get correctly prefixed with the app id at run time * update test to expect the app id prefix ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] leonardge commented on pull request #8566: Fix minor code issue
leonardge commented on pull request #8566: URL: https://github.com/apache/kafka/pull/8566#issuecomment-620716514 Sure will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8562: Test compilation fixes for Scala 2.11
hachikuji commented on a change in pull request #8562: URL: https://github.com/apache/kafka/pull/8562#discussion_r416760519 ## File path: core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala ## @@ -243,9 +245,9 @@ class ReplicaAlterLogDirsThreadTest { responseCallback = callbackCaptor.capture(), isolationLevel = ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED), clientMetadata = ArgumentMatchers.eq(None) -)).thenAnswer(_ => { - callbackCaptor.getValue.apply(Seq((topicPartition, responseData))) -}) +)) thenAnswer new Answer[Unit] { Review comment: nit: inline usage like this is generally reserved for operators like `+` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
cadonna commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416761719 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ## @@ -41,132 +54,107 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; -import org.easymock.EasyMock; -import org.junit.Test; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class HighAvailabilityTaskAssignorTest { -private long acceptableRecoveryLag = 100L; -private int balanceFactor = 1; -private int maxWarmupReplicas = 2; -private int numStandbyReplicas = 0; -private long probingRebalanceInterval = 60 * 1000L; - -private Map clientStates = new HashMap<>(); -private Set allTasks = new HashSet<>(); -private Set statefulTasks = new HashSet<>(); - -private ClientState client1; -private ClientState client2; -private ClientState client3; - -private HighAvailabilityTaskAssignor taskAssignor; - -private void createTaskAssignor() { -final AssignmentConfigs configs = new AssignmentConfigs( -acceptableRecoveryLag, -balanceFactor, -maxWarmupReplicas, -numStandbyReplicas, -probingRebalanceInterval -); -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -configs); -} +private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 0, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); + +private final AssignmentConfigs configWithStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 1, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); -@Test -public void shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() { -client1 = EasyMock.createNiceMock(ClientState.class); -expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0)); -expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS); -replay(client1); -allTasks = mkSet(TASK_0_0, TASK_0_1); -clientStates = singletonMap(UUID_1, client1); -createTaskAssignor(); -assertFalse(taskAssignor.previousAssignmentIsValid()); Review comment: req: I think, you can now restrict access to `previousAssignmentIsValid()` to `private`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8562: Test compilation fixes for Scala 2.11
junrao commented on a change in pull request #8562: URL: https://github.com/apache/kafka/pull/8562#discussion_r416760588 ## File path: core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala ## @@ -243,9 +245,9 @@ class ReplicaAlterLogDirsThreadTest { responseCallback = callbackCaptor.capture(), isolationLevel = ArgumentMatchers.eq(IsolationLevel.READ_UNCOMMITTED), clientMetadata = ArgumentMatchers.eq(None) -)).thenAnswer(_ => { - callbackCaptor.getValue.apply(Seq((topicPartition, responseData))) -}) +)) thenAnswer new Answer[Unit] { Review comment: Should we keep the` .` before `thenAnswer` and use `[]`? The existing convention is the following. ``` when(...) .thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { ... } }) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8574: KAFKA-9925: decorate pseudo-topics with app id
vvcephei commented on a change in pull request #8574: URL: https://github.com/apache/kafka/pull/8574#discussion_r416764158 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java ## @@ -76,6 +78,10 @@ public void setIfUnset(final Serializer defaultSerializer) { throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F"); } +if (primaryKeySerializationPseudoTopic == null) { +primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get(); +} Review comment: This (and below) is a bit awkward. Our requirement is not to call the supplier until after the app starts, but we can call it any time after the app starts. The natural place would be in `configure`, but unfortunately, that method is basically useless for our internal serdes. The reason is that we previously decided that `configure` should be called externally to the DSL, but our internal serdes are constructed _internal_ to the DSL. Plus, `configure` must be called at run time (when the config is available), but by run time, we can no longer tell whether our serde is "internal" or not. So, there's no good place where we can call `configure` for our internal serdes. I'm side-stepping the problem here by just invoking the supplier when we first need to use it, which is also at run time. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -972,13 +974,26 @@ boolean sendingOldValueEnabled() { //This occurs whenever the extracted foreignKey changes values. enableSendingOldValues(); +final NamedInternal renamed = new NamedInternal(joinName); + +final String subscriptionTopicName = renamed.suffixWithOrElseGet( +"-subscription-registration", +builder, +SUBSCRIPTION_REGISTRATION +) + TOPIC_SUFFIX; +// the decoration can't be performed until we have the configuration available when the app runs, +// so we pass Suppliers into the components, which they can call at run time Review comment: Hopefully, this explains what's going on here. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -1224,6 +1224,10 @@ private static Pattern buildPattern(final Collection sourceTopics, return decoratedTopics; } +public String decoratePseudoTopic(final String topic) { Review comment: I'm adding a new public method for our specific use case here, to document that we should _only_ need to invoke this method publicly for "pseudo" topics. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java ## @@ -218,19 +220,19 @@ public void shouldUseExpectedTopicsWithSerde() { } // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the // topics our serdes serialize data for -assertThat(serdeScope.registeredTopics(), CoreMatchers.is(mkSet( +assertThat(serdeScope.registeredTopics(), is(mkSet( // expected pseudo-topics - "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-fk--key", - "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-pk--key", - "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-vh--value", +applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-fk--key", +applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-pk--key", +applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic-vh--value", Review comment: This verifies the fix: the pseudo topics should also be prefixed. I should have noticed before that they weren't. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9926) Flaky test PlaintextAdminIntegrationTest.testCreatePartitions
Wang Ge created KAFKA-9926: -- Summary: Flaky test PlaintextAdminIntegrationTest.testCreatePartitions Key: KAFKA-9926 URL: https://issues.apache.org/jira/browse/KAFKA-9926 Project: Kafka Issue Type: Bug Reporter: Wang Ge [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6007/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/testCreatePartitions/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9927) Add support for varint types to message generator
Jason Gustafson created KAFKA-9927: -- Summary: Add support for varint types to message generator Key: KAFKA-9927 URL: https://issues.apache.org/jira/browse/KAFKA-9927 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson It would be nice to be able to use either a "varint32" or "varint64" type or to add a flag to indicate variable length encoding. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] leonardge commented on pull request #8566: Fix minor code issue
leonardge commented on pull request #8566: URL: https://github.com/apache/kafka/pull/8566#issuecomment-620735042 JIRA [here](https://issues.apache.org/jira/browse/KAFKA-9926). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9926) Flaky test PlaintextAdminIntegrationTest.testCreatePartitions
[ https://issues.apache.org/jira/browse/KAFKA-9926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Ge updated KAFKA-9926: --- Description: Flaky test: kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6007/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/testCreatePartitions/] was:[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6007/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/testCreatePartitions/] > Flaky test PlaintextAdminIntegrationTest.testCreatePartitions > - > > Key: KAFKA-9926 > URL: https://issues.apache.org/jira/browse/KAFKA-9926 > Project: Kafka > Issue Type: Bug >Reporter: Wang Ge >Priority: Major > > Flaky test: kafka.api.PlaintextAdminIntegrationTest.testCreatePartitions > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6007/testReport/junit/kafka.api/PlaintextAdminIntegrationTest/testCreatePartitions/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] pan3793 opened a new pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
pan3793 opened a new pull request #8575: URL: https://github.com/apache/kafka/pull/8575 https://issues.apache.org/jira/browse/KAFKA-8713 https://cwiki.apache.org/confluence/display/KAFKA/KIP-581:+Value+of+optional+null+field+which+has+default+value ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pan3793 commented on pull request #7112: KAFKA-8713: JsonConverter NULL Values are replaced by default values even in NULLABLE fields
pan3793 commented on pull request #7112: URL: https://github.com/apache/kafka/pull/7112#issuecomment-620746182 I do a new implement at https://github.com/apache/kafka/pull/8575 follow the [KIP-581](https://cwiki.apache.org/confluence/display/KAFKA/KIP-581:+Value+of+optional+null+field+which+has+default+value), this PR is deprecated, will close it soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11
hachikuji commented on pull request #8562: URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749823 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11
hachikuji commented on pull request #8562: URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749686 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8562: Test compilation fixes for Scala 2.11
hachikuji commented on pull request #8562: URL: https://github.com/apache/kafka/pull/8562#issuecomment-620749977 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog
[ https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9923: --- Fix Version/s: 2.6.0 > Join window store duplicates can be compacted in changelog > --- > > Key: KAFKA-9923 > URL: https://issues.apache.org/jira/browse/KAFKA-9923 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Critical > Fix For: 2.6.0 > > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > This wrapping occurs at the innermost layer of the store hierarchy, which > means the duplicates must first pass through the changelogging layer. At this > point the keys are still identical. So, we end up sending the records to the > changelog without distinct keys and therefore may lose the older of the > duplicates during compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog
[ https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9923: --- Priority: Blocker (was: Critical) > Join window store duplicates can be compacted in changelog > --- > > Key: KAFKA-9923 > URL: https://issues.apache.org/jira/browse/KAFKA-9923 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > This wrapping occurs at the innermost layer of the store hierarchy, which > means the duplicates must first pass through the changelogging layer. At this > point the keys are still identical. So, we end up sending the records to the > changelog without distinct keys and therefore may lose the older of the > duplicates during compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
ableegoldman commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416809469 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java ## @@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { // Assert that all messages in the second batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); Review comment: `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` still failed on [one of the builds](https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/) at this line :/ But, at least we got farther into the test before it failed so I'd say this is still an improvement 😄 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
ableegoldman commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416810713 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ## @@ -41,132 +54,107 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; -import org.easymock.EasyMock; -import org.junit.Test; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class HighAvailabilityTaskAssignorTest { -private long acceptableRecoveryLag = 100L; -private int balanceFactor = 1; -private int maxWarmupReplicas = 2; -private int numStandbyReplicas = 0; -private long probingRebalanceInterval = 60 * 1000L; - -private Map clientStates = new HashMap<>(); -private Set allTasks = new HashSet<>(); -private Set statefulTasks = new HashSet<>(); - -private ClientState client1; -private ClientState client2; -private ClientState client3; - -private HighAvailabilityTaskAssignor taskAssignor; - -private void createTaskAssignor() { -final AssignmentConfigs configs = new AssignmentConfigs( -acceptableRecoveryLag, -balanceFactor, -maxWarmupReplicas, -numStandbyReplicas, -probingRebalanceInterval -); -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -configs); -} +private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 0, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); + +private final AssignmentConfigs configWithStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 1, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); -@Test -public void shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() { -client1 = EasyMock.createNiceMock(ClientState.class); -expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0)); -expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS); -replay(client1); -allTasks = mkSet(TASK_0_0, TASK_0_1); -clientStates = singletonMap(UUID_1, client1); -createTaskAssignor(); -assertFalse(taskAssignor.previousAssignmentIsValid()); Review comment: Or just remove it completely 😉 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094729#comment-17094729 ] John Roesler commented on KAFKA-9925: - Ok, I've opened [https://github.com/apache/kafka/pull/8574] . If you have the time, a review would help speed things along. Thanks for the report! > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
ableegoldman commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416814163 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java ## @@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { produceGlobalTableValues(); -final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); +final ReadOnlyKeyValueStore replicatedStore = IntegrationTestUtils +.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); +assertNotNull(replicatedStore); Review comment: Why do we have to check for null now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8564: KAFKA-9921: disable caching on stores configured to retain duplicates
guozhangwang commented on pull request #8564: URL: https://github.com/apache/kafka/pull/8564#issuecomment-620783397 Also cherry-picked to 2.5. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094772#comment-17094772 ] Guozhang Wang commented on KAFKA-9925: -- [~vvcephei] Thanks for getting a look into this issue. I'm wondering if now is a good time to deprecate the `StreamsBuilder#build()` function to let users use `build(final Properties props)` instead as a tiny KIP. There's risk of course that the props passed in `build` is not the same as the one passed into the `KafkaStreams` constructor. I think we can remember the reference of the Props when building the topology, and then at construction if we found they are not the same (by reference), we can log a warning such that "found the topology is built with some StreamsConfig already, which is not the same as the config passed in the constructor". > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #8574: KAFKA-9925: decorate pseudo-topics with app id
guozhangwang commented on pull request #8574: URL: https://github.com/apache/kafka/pull/8574#issuecomment-620787049 cc @abbccdda @mjsax to take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9916) Materialize Table-Table Join Result to Avoid Performing Same Join Twice
[ https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094777#comment-17094777 ] Matthias J. Sax commented on KAFKA-9916: The original example was slightly different: {code:java} KStream stream = ... stream.filter((k,v) -> { v.setA("a"); return true; }); stream.filter((k,v) -> ...);{code} For this case, the filters are not chained but executed in parallel, what basically is a broadcast pattern, ie, each record of `stream` is piped into both filters; conceptually, we would need the duplicate the input record, however as an optimization, we don't copy by only pass the same object twice. > Materialize Table-Table Join Result to Avoid Performing Same Join Twice > --- > > Key: KAFKA-9916 > URL: https://issues.apache.org/jira/browse/KAFKA-9916 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Major > > If a table-table join processor performs a join and the join needs to forward > downstream the old join result (e.g. due to an aggregation operation > downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice. > Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} > with the same keys and input into the join operation in this order, the join > processor at some point will join {{L1}} with {{R1}}. When the new right > value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again > {{L1}} with {{R1}}. > We could avoid calling the {{ValueJoiner}} twice by materializing the join > result. We would trade a call to the {{ValueJoiner}} with a lookup into a > state store. Depending on the logic in the {{ValueJoiner}} this may or may > not improve the performance. However, calling the {{ValueJoiner}} once will > only access the input values of the {{ValueJoiner}} once, which avoids the > need to copy the input values each time the {{ValueJoiner}} is called. For > example, consider the following {{ValueJoiner}}: > {code:java} > private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { > leftValue.setSomeValue(rightValue); > return leftValue; > } > {code} > With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice > when {{R2}} trigger the join, the first time with {{R2}} and the second time > with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is > probably not what the users want. To get the correct result, the > {{ValueJoiner}} should be implemented as follows: > > {code:java} > private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { > ComplexValue copy = copy(leftValue); > copy.setSomeValue(rightValue); > return copy; > } > {code} > Copying values during joins could be avoided if the join result were > materialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata
[ https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094781#comment-17094781 ] Matthias J. Sax commented on KAFKA-7317: As reported on SO, when setting number of threads to zero, the client state never goes to RUNNING. Sound like another bug? > Use collections subscription for main consumer to reduce metadata > - > > Key: KAFKA-7317 > URL: https://issues.apache.org/jira/browse/KAFKA-7317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.5.0 > > > In KAFKA-4633 we switched from "collection subscription" to "pattern > subscription" for `Consumer#subscribe()` to avoid triggering auto topic > creating on the broker. In KAFKA-5291, the metadata request was extended to > overwrite the broker config within the request itself. However, this feature > is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the > consumer client, too. > This ticket proposes to use the new feature within Kafka Streams to allow the > usage of collection based subscription in consumer and admit clients to > reduce the metadata response size than can be very large for large number of > partitions in the cluster. > Note, that Streams need to be able to distinguish if it connects to older > brokers that do not support the new metadata request and still use pattern > subscription for this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9127) Needless group coordination overhead for GlobalKTables
[ https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092826#comment-17092826 ] Matthias J. Sax edited comment on KAFKA-9127 at 4/28/20, 6:54 PM: -- [~ableegoldman] Seems we introduced a regression in 2.5.0 via KAFKA-7317 that would be fixed with this ticket (cf. [https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic]). If you agree, we should cherry-pick the fix to 2.5 branch. And also add a corresponding test. Thoughts? was (Author: mjsax): [~ableegoldman] Seems we introduced a regression in 2.5.0 via KAFKA-7317 that would be fixed with this ticket (cf. [https://stackoverflow.com/questions/61342530/kafka-streams-2-5-0-requires-input-topic]). If you agree, we should cherry-pick the fix ti 2.5 branch. And also add a corresponding test. Thoughts? > Needless group coordination overhead for GlobalKTables > -- > > Key: KAFKA-9127 > URL: https://issues.apache.org/jira/browse/KAFKA-9127 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Chris Toomey >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > When creating a simple stream topology to just populate a GlobalKTable, I > noticed from logging that the stream consumer was doing group coordination > requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to > do since the global consumer thread populating the table fetches from all > partitions and thus doesn't use the group requests. So this adds needless > overhead on the client, network, and server. > I tracked this down to the stream thread consumer, which is created > regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG > which defaults to 1 I guess. > I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from > happening, but it'd be a worthwhile improvement to be able to override this > setting in cases of topologies like this that don't have any need for stream > threads. Hence this ticket. > I originally asked about this on the users mailing list where Bruno suggested > I file it as an improvement request. > Here's the Scala code that I'm using that exhibits this: > {code:scala} > val builder: StreamsBuilder = new StreamsBuilder() > val gTable = builder.globalTable[K, V](...) > val stream = new KafkaStreams(builder.build(), props) > stream.start(){code} > Not shown is the state store that I'm populating/using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9127) Needless group coordination overhead for GlobalKTables
[ https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094791#comment-17094791 ] Matthias J. Sax commented on KAFKA-9127: {quote}Does it qualify as a regression when the workaround is the same as the fix? {quote} IMHO, it does, because if you don't change any code/configs and upgrade to 2.5 it breaks. Btw: setting the number of threads to zero exposes a different bug: the client does not transit to state RUNNING > Needless group coordination overhead for GlobalKTables > -- > > Key: KAFKA-9127 > URL: https://issues.apache.org/jira/browse/KAFKA-9127 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Chris Toomey >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > When creating a simple stream topology to just populate a GlobalKTable, I > noticed from logging that the stream consumer was doing group coordination > requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to > do since the global consumer thread populating the table fetches from all > partitions and thus doesn't use the group requests. So this adds needless > overhead on the client, network, and server. > I tracked this down to the stream thread consumer, which is created > regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG > which defaults to 1 I guess. > I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from > happening, but it'd be a worthwhile improvement to be able to override this > setting in cases of topologies like this that don't have any need for stream > threads. Hence this ticket. > I originally asked about this on the users mailing list where Bruno suggested > I file it as an improvement request. > Here's the Scala code that I'm using that exhibits this: > {code:scala} > val builder: StreamsBuilder = new StreamsBuilder() > val gTable = builder.globalTable[K, V](...) > val stream = new KafkaStreams(builder.build(), props) > stream.start(){code} > Not shown is the state store that I'm populating/using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata
[ https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094797#comment-17094797 ] Sophie Blee-Goldman commented on KAFKA-7317: Also fixed via [https://github.com/apache/kafka/pull/8540] > Use collections subscription for main consumer to reduce metadata > - > > Key: KAFKA-7317 > URL: https://issues.apache.org/jira/browse/KAFKA-7317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.5.0 > > > In KAFKA-4633 we switched from "collection subscription" to "pattern > subscription" for `Consumer#subscribe()` to avoid triggering auto topic > creating on the broker. In KAFKA-5291, the metadata request was extended to > overwrite the broker config within the request itself. However, this feature > is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the > consumer client, too. > This ticket proposes to use the new feature within Kafka Streams to allow the > usage of collection based subscription in consumer and admit clients to > reduce the metadata response size than can be very large for large number of > partitions in the cluster. > Note, that Streams need to be able to distinguish if it connects to older > brokers that do not support the new metadata request and still use pattern > subscription for this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #8204: Ensure ConfigProviders are closed
kkonstantine commented on pull request #8204: URL: https://github.com/apache/kafka/pull/8204#issuecomment-620795402 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094799#comment-17094799 ] Matthias J. Sax commented on KAFKA-9925: {quote}I'm wondering if now is a good time to deprecate the `StreamsBuilder#build()` function to let users use `build(final Properties props)` instead as a tiny KIP. {quote} Just FYI: this is already proposed in KIP-591. However, IMHO, we should fix it for older versions, too? > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata
[ https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094800#comment-17094800 ] Matthias J. Sax commented on KAFKA-7317: Sweet! > Use collections subscription for main consumer to reduce metadata > - > > Key: KAFKA-7317 > URL: https://issues.apache.org/jira/browse/KAFKA-7317 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.5.0 > > > In KAFKA-4633 we switched from "collection subscription" to "pattern > subscription" for `Consumer#subscribe()` to avoid triggering auto topic > creating on the broker. In KAFKA-5291, the metadata request was extended to > overwrite the broker config within the request itself. However, this feature > is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the > consumer client, too. > This ticket proposes to use the new feature within Kafka Streams to allow the > usage of collection based subscription in consumer and admit clients to > reduce the metadata response size than can be very large for large number of > partitions in the cluster. > Note, that Streams need to be able to distinguish if it connects to older > brokers that do not support the new metadata request and still use pattern > subscription for this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9127) Needless group coordination overhead for GlobalKTables
[ https://issues.apache.org/jira/browse/KAFKA-9127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094801#comment-17094801 ] Sophie Blee-Goldman commented on KAFKA-9127: Yep, if you can kick off tests on that PR and give it another pass it should fix both issues and we can backport it to 2.5 > Needless group coordination overhead for GlobalKTables > -- > > Key: KAFKA-9127 > URL: https://issues.apache.org/jira/browse/KAFKA-9127 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Chris Toomey >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > When creating a simple stream topology to just populate a GlobalKTable, I > noticed from logging that the stream consumer was doing group coordination > requests (JoinGroup, SyncGroup, ...) to the server, which it had no reason to > do since the global consumer thread populating the table fetches from all > partitions and thus doesn't use the group requests. So this adds needless > overhead on the client, network, and server. > I tracked this down to the stream thread consumer, which is created > regardless of whether it's needed based solely on NUM_STREAM_THREADS_CONFIG > which defaults to 1 I guess. > I found that setting NUM_STREAM_THREADS_CONFIG to 0 will prevent this from > happening, but it'd be a worthwhile improvement to be able to override this > setting in cases of topologies like this that don't have any need for stream > threads. Hence this ticket. > I originally asked about this on the users mailing list where Bruno suggested > I file it as an improvement request. > Here's the Scala code that I'm using that exhibits this: > {code:scala} > val builder: StreamsBuilder = new StreamsBuilder() > val gTable = builder.globalTable[K, V](...) > val stream = new KafkaStreams(builder.build(), props) > stream.start(){code} > Not shown is the state store that I'm populating/using. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9925) Non-key KTable Joining may result in duplicate schema name in confluence schema registry
[ https://issues.apache.org/jira/browse/KAFKA-9925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094805#comment-17094805 ] Guozhang Wang commented on KAFKA-9925: -- Ah yes!! Hope we can get KIP-591 by 2.6 :) > Non-key KTable Joining may result in duplicate schema name in confluence > schema registry > > > Key: KAFKA-9925 > URL: https://issues.apache.org/jira/browse/KAFKA-9925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.1 >Reporter: Kin Siu >Assignee: John Roesler >Priority: Major > > The second half of issue Andy Bryant reported in KAFKA-9390 looks like still > exist. > When testing non-key join method without passing in "Named", I noticed that > there are schema subjects registered in confluent schema registry without > consumer group Id still, > e.g. > {noformat} > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-05-topic-vh-value", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-pk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-fk-key", > "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-25-topic-vh-value" > {noformat} > Code in KTableImpl which constructed above naming : > https://github.com/apache/kafka/blob/2.4.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L959 > When we have multiple topologies using foreignKey join and registered to same > schema registry, we can have a name clash, and fail to register schema. > In order to clean up these schema subjects, we will need to know the internal > naming of a consumer group's topology, which is not straightforward and error > prone. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed
kkonstantine commented on pull request #8204: URL: https://github.com/apache/kafka/pull/8204#issuecomment-620803829 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8204: KAFKA-9633: Ensure ConfigProviders are closed
kkonstantine commented on a change in pull request #8204: URL: https://github.com/apache/kafka/pull/8204#discussion_r416866064 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -220,6 +220,8 @@ public void stop() { workerMetricsGroup.close(); connectorStatusMetricsGroup.close(); + +workerConfigTransformer.close(); Review comment: Looking at the initialization of `workerConfigTransformer` I see it should be made final. And then I notice that this is the case for `connectorClientConfigOverridePolicy` and all the class members of `ConnectorStatusMetricsGroup`. @tombentley do you mind tightening these types as well? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java ## @@ -98,4 +101,8 @@ public void onCompletion(Throwable error, Void result) { HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb); connectorRequests.put(path, request); } + +public void close() { Review comment: should we also change this class to implement `AutoCloseable`? This can't be used immediately in a try-with-resources clause, but probably better to signal the existence of this method at the class level. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test
abbccdda commented on a change in pull request #8518: URL: https://github.com/apache/kafka/pull/8518#discussion_r416879514 ## File path: tests/kafkatest/tests/core/downgrade_test.py ## @@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, security_protocol): version=kafka_version) self.producer.start() +static_membership = kafka_version == DEV_BRANCH or kafka_version >= LATEST_2_3 Review comment: I see, makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094835#comment-17094835 ] Sophie Blee-Goldman commented on KAFKA-9921: > For 2 puts I would expect 2 entries regardless if they accidentally match Fair enough. I guess for that reason then caching and inherently incompatible, right? Regarding putting _null_ values, I think the behavior with _retainDuplicates_ is as expected. The Streams library uses window stores with duplicates for stream-stream joins, for which a null value produces no output and isn't considered a tombstone (see [semantics of stream-stream joins|https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstream-kstream-join] section). I'm starting to get a better sense of what you're trying to do here, but it sounds like the semantics you want might differ slightly from what Streams would consider a stream-stream join. Do you explicitly want a windowed join, or are you just using the window store because the retention policy will keep state from growing without bound? Does your use case require _null_ values to be treated as deletes? By the way, if the built-in stores don't match your requirements exactly you can always plug in a custom store. You could even just wrap one of the built-in stores to reuse the pieces that work for you, and skip the ones that don't. The rocksdb WindowStore is actually just built out of segments of the rocksdb KeyValueStore, for example. > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094839#comment-17094839 ] Sophie Blee-Goldman commented on KAFKA-9921: I take it you're using rocksdb, by the way? If you are (or can) use the in-memory stores then storing a list and appending should be pretty fast. On that note, I'm actually not sure storing the entire list would be slower than storing individual duplicate records even with rocskdb. I actually have a suspicious that it might even be faster to store as a list, assuming the number and size of duplicates isn't incredibly large (relative to the memtable and block size scale) > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094844#comment-17094844 ] Sophie Blee-Goldman commented on KAFKA-9921: I'm resolving the ticket because the PR to disable caching + duplicates and note this in the javadocs was just merged. If you have the chance to take a quick look and let me know if there's anything I missed clarifying in the docs, I can submit a quick followup PR or review one from you if you have something specific in mind > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] gwenshap commented on pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test
gwenshap commented on pull request #8518: URL: https://github.com/apache/kafka/pull/8518#issuecomment-620826970 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9921: --- Fix Version/s: 2.5.1 > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it back). So I go for _List>_ instead of _KeyValuePair List>_. The whole use case is more complex than that so I use > _#transformValues_ and state stores. > So as an impact I can't use caching on my state stores. For others - they'll > have incorrect behavior that may take a lot of time to be discovered and even > more time to fix the results. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
Guozhang Wang created KAFKA-9928: Summary: Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta] Key: KAFKA-9928 URL: https://issues.apache.org/jira/browse/KAFKA-9928 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang {code} Stacktrace java.lang.AssertionError: Condition not met within timeout 3. waiting for final values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178) {code} I looked at the below examples: https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/ https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/ https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/ And also reproduced the flakiness locally after about 180 runs, and the failed one did not have any obvious different traces compared with the successful ones. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416910725 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java ## @@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { produceGlobalTableValues(); -final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); +final ReadOnlyKeyValueStore replicatedStore = IntegrationTestUtils +.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); +assertNotNull(replicatedStore); Review comment: Since previously we would just throw the exception with the un-wrapped call, here asserting it is not null is equal to make sure that the store is indeed returned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
vvcephei commented on pull request #8540: URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846265 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
vvcephei commented on pull request #8540: URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846462 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support
vvcephei commented on pull request #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-620847005 Whew! System tests passed: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-04-28--001.1588064884--ConcurrencyPractitioner--EMIT-ON-CHANGE--ddbf2cf/report.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on pull request #8568: URL: https://github.com/apache/kafka/pull/8568#issuecomment-620847306 I looked at the three failed tests: * `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` is actually due to the issue that https://github.com/apache/kafka/pull/8548 tried to fix. Waiting for @vvcephei to review 8548 * `EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]` is being looked at by @mjsax as KAFKA-9831 * `GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]` is a new issue, I created KAFKA-9928 for this, and my gut feeling is that it has the same root cause as KAFKA-9831. (also cc @mjsax ) So I think this PR is good to be merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416916295 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ## @@ -41,132 +54,107 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; -import org.easymock.EasyMock; -import org.junit.Test; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class HighAvailabilityTaskAssignorTest { -private long acceptableRecoveryLag = 100L; -private int balanceFactor = 1; -private int maxWarmupReplicas = 2; -private int numStandbyReplicas = 0; -private long probingRebalanceInterval = 60 * 1000L; - -private Map clientStates = new HashMap<>(); -private Set allTasks = new HashSet<>(); -private Set statefulTasks = new HashSet<>(); - -private ClientState client1; -private ClientState client2; -private ClientState client3; - -private HighAvailabilityTaskAssignor taskAssignor; - -private void createTaskAssignor() { -final AssignmentConfigs configs = new AssignmentConfigs( -acceptableRecoveryLag, -balanceFactor, -maxWarmupReplicas, -numStandbyReplicas, -probingRebalanceInterval -); -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -configs); -} +private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 0, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); + +private final AssignmentConfigs configWithStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 1, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); -@Test -public void shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() { -client1 = EasyMock.createNiceMock(ClientState.class); -expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0)); -expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS); -replay(client1); -allTasks = mkSet(TASK_0_0, TASK_0_1); -clientStates = singletonMap(UUID_1, client1); -createTaskAssignor(); -assertFalse(taskAssignor.previousAssignmentIsValid()); Review comment: Since you have a follow-on PR that touches this method, I'll leave it alone and just proceed to merge. We should consider both of these options in the follow-on. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fantayeneh opened a new pull request #8576: format with correct syntax
fantayeneh opened a new pull request #8576: URL: https://github.com/apache/kafka/pull/8576 small change fix string formatting ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-9928: -- Assignee: Matthias J. Sax > Flaky > GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta] > - > > Key: KAFKA-9928 > URL: https://issues.apache.org/jira/browse/KAFKA-9928 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Matthias J. Sax >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. waiting for > final values > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) > at > org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178) > {code} > I looked at the below examples: > https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/ > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/ > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/ > And also reproduced the flakiness locally after about 180 runs, and the > failed one did not have any obvious different traces compared with the > successful ones. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9928: --- Component/s: unit tests streams > Flaky > GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta] > - > > Key: KAFKA-9928 > URL: https://issues.apache.org/jira/browse/KAFKA-9928 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. waiting for > final values > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) > at > org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:178) > {code} > I looked at the below examples: > https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_/ > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_beta__2/ > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6017/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableEOSIntegrationTest/shouldKStreamGlobalKTableLeftJoin_exactly_once_beta__2/ > And also reproduced the flakiness locally after about 180 runs, and the > failed one did not have any obvious different traces compared with the > successful ones. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
vvcephei commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416921971 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ## @@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc } private void verifyStateStore(final KafkaStreams streams, - final Set> expectedStoreContent) { -ReadOnlyKeyValueStore store = null; - -final long maxWaitingTime = System.currentTimeMillis() + 30L; -while (System.currentTimeMillis() < maxWaitingTime) { -try { -store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); -break; -} catch (final InvalidStateStoreException okJustRetry) { -try { -Thread.sleep(5000L); -} catch (final Exception ignore) { } -} -} - + final Set> expectedStoreContent) throws InterruptedException { +final ReadOnlyKeyValueStore store = IntegrationTestUtils +.getStore(30L, storeName, streams, QueryableStoreTypes.keyValueStore()); Review comment: ```suggestion .getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore()); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java ## @@ -337,8 +336,11 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws TestUtils.waitForCondition( () -> { try { -final ReadOnlyKeyValueStore store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); +final ReadOnlyKeyValueStore store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore()); + +if (store == null) +return false; Review comment: not a huge deal, but technically, these should have brackets. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -599,13 +595,6 @@ public static void waitForCompletion(final KafkaStreams streams, return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false); } -public static List> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig, Review comment: thanks for the cleanup ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final * @param Key type of the data records * @param Value type of the data records */ -@SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Long timestamp, - final boolean enableTransactions) -throws ExecutionException, InterruptedException { + final boolean enableTransactions) { try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); producer.beginTransaction(); } for (final KeyValue record : records) { -final Future f = producer.send( -new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); -f.get(); +producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); Review comment: I guess the flush at the end makes it synchronous anyway? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9875) Flaky Test SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once]
[ https://issues.apache.org/jira/browse/KAFKA-9875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-9875: --- Assignee: John Roesler > Flaky Test > SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown[exactly_once] > -- > > Key: KAFKA-9875 > URL: https://issues.apache.org/jira/browse/KAFKA-9875 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Assignee: John Roesler >Priority: Major > Labels: flaky-test, unit-test > > h3. Stacktrace > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out. at > org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:211) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAllTopicsAndWait(EmbeddedKafkaCluster.java:300) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest(IntegrationTestUtils.java:148) > at > org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest.shouldRecoverBufferAfterShutdown(SuppressionDurabilityIntegrationTest.java:246) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] fantayeneh opened a new pull request #8577: use appropriate fn for readability. (maybe)
fantayeneh opened a new pull request #8577: URL: https://github.com/apache/kafka/pull/8577 using the min, max might make the code a little easier to read. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416942992 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ## @@ -810,21 +808,9 @@ private void writeInputData(final List> records) throws Exc } private void verifyStateStore(final KafkaStreams streams, - final Set> expectedStoreContent) { -ReadOnlyKeyValueStore store = null; - -final long maxWaitingTime = System.currentTimeMillis() + 30L; -while (System.currentTimeMillis() < maxWaitingTime) { -try { -store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); -break; -} catch (final InvalidStateStoreException okJustRetry) { -try { -Thread.sleep(5000L); -} catch (final Exception ignore) { } -} -} - + final Set> expectedStoreContent) throws InterruptedException { +final ReadOnlyKeyValueStore store = IntegrationTestUtils +.getStore(30L, storeName, streams, QueryableStoreTypes.keyValueStore()); Review comment: Ack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416943907 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java ## @@ -337,8 +336,11 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws TestUtils.waitForCondition( () -> { try { -final ReadOnlyKeyValueStore store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); +final ReadOnlyKeyValueStore store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore()); + +if (store == null) +return false; Review comment: ack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416944392 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final * @param Key type of the data records * @param Value type of the data records */ -@SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Long timestamp, - final boolean enableTransactions) -throws ExecutionException, InterruptedException { + final boolean enableTransactions) { try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); producer.beginTransaction(); } for (final KeyValue record : records) { -final Future f = producer.send( -new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); -f.get(); +producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); Review comment: Previously we wait after sending each record, here we only wait once after sending all records, so it is more efficient. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
vvcephei commented on a change in pull request #8568: URL: https://github.com/apache/kafka/pull/8568#discussion_r416947800 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final * @param Key type of the data records * @param Value type of the data records */ -@SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Long timestamp, - final boolean enableTransactions) -throws ExecutionException, InterruptedException { + final boolean enableTransactions) { try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { producer.initTransactions(); producer.beginTransaction(); } for (final KeyValue record : records) { -final Future f = producer.send( -new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); -f.get(); +producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); Review comment: Thanks. That's what I was asking for confirmation on. I realize now the structure of my sentence was ambiguous. I agree that the method contract is that the batch should be synchronously produced, not that each record should be synchronously produced, so this change looks good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
junrao commented on pull request #8543: URL: https://github.com/apache/kafka/pull/8543#issuecomment-620879490 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams
guozhangwang commented on pull request #8568: URL: https://github.com/apache/kafka/pull/8568#issuecomment-620879654 Merged to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
junrao commented on pull request #8542: URL: https://github.com/apache/kafka/pull/8542#issuecomment-620880304 @steverod : Does the JDK 8 and Scala 2.12 tests pass for you locally? Not sure why the jenkins test failed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe edited a comment on pull request #8569: KIP-551: Expose disk read and write metrics
cmccabe edited a comment on pull request #8569: URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289 > In addition to block-level read/write, would there be a benefit to expose file system read/write metrics? It's better to have that discussion on the mailing list. This PR is just about KIP-551. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8569: KIP-551: Expose disk read and write metrics
cmccabe commented on pull request #8569: URL: https://github.com/apache/kafka/pull/8569#issuecomment-620884289 > In addition to block-level read/write, would there be a benefit to expose file system read/write metrics? It's better to have that discussion on the mailing list. This PR is just about KIP-551. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094900#comment-17094900 ] Georgi Petkov commented on KAFKA-9921: -- [~ableegoldman] Yeah, I agree that probably not much can be done in terms of caching (compared to the options without _retainDuplicates_). I totally agree that many of the features like the null value behavior are correct and make perfect sense from point of view of the features implemented with it. Still, it's strange from the perspective where you use it standalone. *1-2 sentences clarifying the behavior with null values in the _WindowStateStore_ documentation could definitely help.* In addition, as I said if this is the desired behavior *you can easily skip calling RocksDB for null values (when using _retainDuplicates)_. This would make both the intention clearer and obviously avoid unnecessary calls.* I do need exactly stream-stream join but without the repartition part. I want to get matches when there are new events in whichever stream and I also use _WindowStateStore_ only for the retention policy. In fact, due to the lack of many examples, I was looking at the stream-stream join implementation to find out how to correctly use the _WindowStateStores_. I'm building a library for some common yet not trivial at all operations on streams that you may need like topological sorting. Therefore I don't know if the user will provide null values or not. I was curious about the behavior with null values so I know what I'm providing to the user. I've tested it and that's how I found out what is the exact behavior. *I'm not sure that an in-memory or any custom state store will make it.* Yes, in-memory will help with the efficient append because it avoids any expensive call and serializations/deserializations. Nevertheless, *you will always have the serializations/deserializations somewhere and this is the changelog topic and there you have also bandwidth* (not just precious processing time). Even if the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - O(n^2). Combined with the fact that I want to provide a library to many different users (and duplicates count may vary a lot between usages) *to me it's best to implement just as in the stream-stream join - with duplicates*. Still, it was a great discussion and made me more confident in my decisions. Thank you for your assistance. *Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and _TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.* > Caching is not working properly with WindowStateStore when rataining > duplicates > --- > > Key: KAFKA-9921 > URL: https://issues.apache.org/jira/browse/KAFKA-9921 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Georgi Petkov >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0, 2.5.1 > > > I'm using the current latest version 2.5.0 but this is not something new. > I have _WindowStateStore_ configured as following (where _true_ stands for > the _retainDuplicates_ paramter): > _builder.addStateStore(windowStoreBuilder(persistentWindowStore(name, > retentionPeriod, windowSize, *true*), keySerde, > valueSerde)*.withCachingEnabled()*)_ > If I put 4 key-value pairs with the same key and values *1, 2, 3, 4* in that > order when reading them through the iterator I'll get the values *4, 2, 3, 4*. > I've done a bit of investigation myself and the problem is that *the whole > caching feature is written without consideration of the case where duplicates > are retained*. > The observed behavior is due to having the last value in the cache (and it > can have only one since it's not aware of the retain duplicates option) and > it is read first (while skipping the first from the RocksDB iterator even > though the values are different). This can be observed (for version 2.5.0) in > _AbstractMergedSortedCacheStoreIterator#next()_ lines 95-97. Then the next 3 > values are read from the RocksDB iterator so they are as expected. > As I said, the whole feature is not considering the _retainDuplicates_ option > so there are other examples of incorrect behavior like in > _AbstractMergedSortedCacheStoreIterator__#peekNextKey()_ - for each call, you > would skip one duplicate entry in the RocksDB iterator for the given key. > In my use case, I want to persist a list of values for a given key without > increasing the complexity to linear for a single event (which would be the > case if I was always reading the current list appending one value and writing > it ba
[jira] [Commented] (KAFKA-9928) Flaky GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]
[ https://issues.apache.org/jira/browse/KAFKA-9928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094902#comment-17094902 ] Guozhang Wang commented on KAFKA-9928: -- I found that for the failed run, around the time when the producer of {{produceTopicValues(streamTopic);}} around line 172 is being closed, the following entries are printed (whereas succeeded runs do not have those), cc [~mjsax]: {code} [2020-04-28 15:10:58,458] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Fetch offset 9 is out of range for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1261) [2020-04-28 15:10:58,458] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Resetting offset for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:383) [2020-04-28 15:10:58,459] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Fetch offset 9 is out of range for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1261) [2020-04-28 15:10:58,460] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Resetting offset for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:383) [2020-04-28 15:10:58,461] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Fetch offset 9 is out of range for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:1261) [2020-04-28 15:10:58,461] INFO [Consumer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-consumer, groupId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_] Resetting offset for partition stream-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:383) [2020-04-28 15:10:58,566] INFO [Producer clientId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-StreamThread-1-producer, transactionalId=globalTable-eos-test-shouldKStreamGlobalKTableLeftJoin_exactly_once_beta_-0a424027-ab72-4de4-9d83-58989a76b029-1] Discovered group coordinator localhost:54279 (id: 0 rack: null) (org.apache.kafka.clients.producer.internals.TransactionManager:1525) [2020-04-28 15:11:00,740] INFO [Controller id=0] Processing automatic preferred replica leader election (kafka.controller.KafkaController:66) {code} Note that this CLUSTER only have one broker. > Flaky > GlobalKTableEOSIntegrationTest#shouldKStreamGlobalKTableLeftJoin[exactly_once_beta] > - > > Key: KAFKA-9928 > URL: https://issues.apache.org/jira/browse/KAFKA-9928 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Matthias J. Sax >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. waiting for > final values > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:380) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:
[jira] [Comment Edited] (KAFKA-9921) Caching is not working properly with WindowStateStore when rataining duplicates
[ https://issues.apache.org/jira/browse/KAFKA-9921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094900#comment-17094900 ] Georgi Petkov edited comment on KAFKA-9921 at 4/28/20, 10:23 PM: - [~ableegoldman] Yeah, I agree that probably not much can be done in terms of caching (compared to the options without _retainDuplicates_). I totally agree that many of the features like the null value behavior are correct and make perfect sense from point of view of the features implemented with it. Still, it's strange from the perspective where you use it standalone. *1-2 sentences clarifying the behavior with null values in the _WindowStateStore_ documentation could definitely help.* In addition, as I said if this is the desired behavior *you can easily skip calling RocksDB for null values (when using _retainDuplicates)_. This would both make the intention clearer and obviously avoid unnecessary calls.* I do need exactly stream-stream join but without the repartition part. I want to get matches when there are new events in whichever stream, support duplicate keys in the stream and I also use _WindowStateStore_ only for the retention policy. In fact, due to the lack of many examples, I was looking at the stream-stream join implementation to find out how to correctly use the _WindowStateStores_. I'm building a library for some common yet not trivial at all operations on streams that you may need like topological sorting. Therefore I don't know if the user will provide null values or not. I was curious about the behavior with null values so I know what I'm providing to the user. I've tested it and that's how I found out what is the exact behavior. *I'm not sure that an in-memory or any custom state store will make it.* Yes, in-memory will help with the efficient append because it avoids any expensive call and serializations/deserializations. Nevertheless, *you will always have the serializations/deserializations somewhere and this is the changelog topic and there you have also bandwidth* (not just precious processing time). Even if the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - O(n^2). Combined with the fact that I want to provide a library to many different users (and duplicates count may vary a lot between usages) *to me it's best to implement just as in the stream-stream join - with duplicates*. Still, it was a great discussion and made me more confident in my decisions. Thank you for your assistance. *Regarding the PR - it adds the same code to both _WindowStoreBuilder.java_ and _TimestampedWindowStoreBuilder.java_ but adds a test for only one of them.* was (Author: georgi.petkov): [~ableegoldman] Yeah, I agree that probably not much can be done in terms of caching (compared to the options without _retainDuplicates_). I totally agree that many of the features like the null value behavior are correct and make perfect sense from point of view of the features implemented with it. Still, it's strange from the perspective where you use it standalone. *1-2 sentences clarifying the behavior with null values in the _WindowStateStore_ documentation could definitely help.* In addition, as I said if this is the desired behavior *you can easily skip calling RocksDB for null values (when using _retainDuplicates)_. This would make both the intention clearer and obviously avoid unnecessary calls.* I do need exactly stream-stream join but without the repartition part. I want to get matches when there are new events in whichever stream and I also use _WindowStateStore_ only for the retention policy. In fact, due to the lack of many examples, I was looking at the stream-stream join implementation to find out how to correctly use the _WindowStateStores_. I'm building a library for some common yet not trivial at all operations on streams that you may need like topological sorting. Therefore I don't know if the user will provide null values or not. I was curious about the behavior with null values so I know what I'm providing to the user. I've tested it and that's how I found out what is the exact behavior. *I'm not sure that an in-memory or any custom state store will make it.* Yes, in-memory will help with the efficient append because it avoids any expensive call and serializations/deserializations. Nevertheless, *you will always have the serializations/deserializations somewhere and this is the changelog topic and there you have also bandwidth* (not just precious processing time). Even if the list is fixed to let's say only 5 items you will still have 15 (1 + 2 + 3 + 4 + 5) events recorded instead of 5. Obviously the size grows pretty fast - O(n^2). Combined with the fact that I want to provide a library to many different users (and duplicates coun
[GitHub] [kafka] cmccabe commented on a change in pull request #8569: KIP-551: Expose disk read and write metrics
cmccabe commented on a change in pull request #8569: URL: https://github.com/apache/kafka/pull/8569#discussion_r416961523 ## File path: core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala ## @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.nio.file.{Files, Paths} + +import org.apache.kafka.common.utils.Time +import org.slf4j.Logger + +import scala.jdk.CollectionConverters._ + +/** + * Retrieves Linux /proc/self/io metrics. + */ +class LinuxIoMetricsCollector(val procPath: String, val time: Time, val logger: Logger) { + import LinuxIoMetricsCollector._ + var lastUpdateMs = -1L + var cachedReadBytes = 0L + var cachedWriteBytes = 0L + + def readBytes(): Long = this.synchronized { +val curMs = time.milliseconds() +if (curMs != lastUpdateMs) { + updateValues(curMs) +} +cachedReadBytes + } + + def writeBytes(): Long = this.synchronized { +val curMs = time.milliseconds() +if (curMs != lastUpdateMs) { + updateValues(curMs) +} +cachedWriteBytes + } + + /** + * Read /proc/self/io. + * + * Generally, each line in this file contains a prefix followed by a colon and a number. + * + * For example, it might contain this: + * rchar: 4052 + * wchar: 0 + * syscr: 13 + * syscw: 0 + * read_bytes: 0 + * write_bytes: 0 + * cancelled_write_bytes: 0 + */ + def updateValues(now: Long): Boolean = this.synchronized { Review comment: Unless we choose to read this file in a background thread, there isn't a reason to avoid using a lock here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8567: KAFKA-9652: Fix throttle metric in RequestChannel and request log due to KIP-219
ijuma commented on pull request #8567: URL: https://github.com/apache/kafka/pull/8567#issuecomment-620887867 2 jobs passed, 1 unrelated flaky test failed: > org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] steverod commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
steverod commented on pull request #8542: URL: https://github.com/apache/kafka/pull/8542#issuecomment-620893016 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei opened a new pull request #8578: URL: https://github.com/apache/kafka/pull/8578 The ticket is for a flaky test that failed to clean up topics _after_ the test, which isn't strictly necessary for test success. * alter the "clean up after test" method to never throw an exception (after verifying it's always the last invocation inside a finally block, so it won't break any test semantics) * consolidated the naming of all integration tests' app ids, topics, etc., by introducing a new test utility to generate safe, unique, descriptive names. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8578: KAFKA-9875: Make integration tests more resilient
vvcephei commented on a change in pull request #8578: URL: https://github.com/apache/kafka/pull/8578#discussion_r416969713 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java ## @@ -101,8 +102,8 @@ public void before() throws Exception { builder = new StreamsBuilder(); createTopics(); streamsConfiguration = new Properties(); -final String applicationId = "global-thread-shutdown-test" + testName.getMethodName(); -streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); +final String safeTestName = safeUniqueTestName(getClass(), testName); +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); Review comment: I've standardized all the usages to be just "app", followed by the generated name, since the generated name contains the same information that we previously hand-wrote into the prefix or suffix. All we really need to do is ensure that the app id won't collide with a group name that we might use in a verification consumer, for example. For that reason, I've never used the generated name "plain", but always scoped it to the usage (app id, group id, input topic, etc.). It's not super important to apply these ideas universally, but I felt it would make it easier to write more tests like it in the future if I just made a full pass on all the tests to make them all look the same. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -88,16 +89,17 @@ private String stateStoreName; @Rule -public TestName name = new TestName(); +public TestName testName = new TestName(); @Before public void before() { -inputTopicName = "input-topic-" + name.getMethodName(); -outputTopicName = "output-topic-" + name.getMethodName(); -stateStoreName = "lagfetch-test-store" + name.getMethodName(); +final String safeTestName = safeUniqueTestName(getClass(), testName); +inputTopicName = "input-topic-" + safeTestName; +outputTopicName = "output-topic-" + safeTestName; +stateStoreName = "lagfetch-test-store" + safeTestName; streamsConfiguration = new Properties(); -streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lag-fetch-" + name.getMethodName()); +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "lag-fetch-" + safeTestName); Review comment: ```suggestion streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -145,12 +162,12 @@ public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, fina } } -public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { -driver.cleanUp(); +public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { try { +driver.cleanUp(); cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); -} catch (final InterruptedException e) { -throw new RuntimeException(e); +} catch (final RuntimeException | InterruptedException e) { +LOG.warn("Ignoring failure to clean test state", e); } Review comment: This is really the fix for KAFKA-9875. The other change just hopefully reduces the probability that ignoring the exceptions could cause subsequent failures (e.g., if the topics don't get deleted before the next test, at least the next one will have different topic names). I've verified that all usages of this method are ok to ignore potential exceptions. Namely, as long as the test logic itself doesn't want to ensure that any topics got deleted, and as long as this method is the last line in the method, then it should be fine just to ignore failures here. I also considered just deleting the method, but if it does succeed, then it leaves less garbage around for subsequent tests, so it feels better to at least attempt a cleanup. ## File path: streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java ## @@ -106,7 +106,9 @@ public static void startKafkaStreamsAndWaitForRunningState(final KafkaStreams ka kafkaStreams.start(); assertThat( "KafkaStreams did not transit to RUNNING state within " + timeoutMs + " milli seconds.", -countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), equalTo(true)); +countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS), +equalTo(true) +); Review comment: just fixing the formatting. --
[GitHub] [kafka] ableegoldman commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on pull request #8540: URL: https://github.com/apache/kafka/pull/8540#issuecomment-620909428 One unrelated failure: `MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9929) Support reverse iterator on WindowStore
Jorge Esteban Quilcate Otoya created KAFKA-9929: --- Summary: Support reverse iterator on WindowStore Key: KAFKA-9929 URL: https://issues.apache.org/jira/browse/KAFKA-9929 Project: Kafka Issue Type: Improvement Components: streams Reporter: Jorge Esteban Quilcate Otoya Currently, WindowStore fetch operations return an iterator sorted from earliest to latest result: ``` * For each key, the iterator guarantees ordering of windows, starting from the oldest/earliest * available window to the newest/latest window. ``` We have a use-case where traces are stored in a WindowStore and use Kafka Streams to create a materialized view of traces. A query request comes with a time range (e.g. now-1h, now) and want to return the most recent results, i.e. fetch from this period of time, iterate and pattern match latest/most recent traces, and if enough results, then reply without moving further on the iterator. Same store is used to search for previous traces. In this case, it search a key for the last day, if found traces, we would also like to iterate from the most recent. RocksDb seems to support iterating backward and forward: [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound] For reference: This in some way extracts some bits from this previous issue: https://issues.apache.org/jira/browse/KAFKA-4212: > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. But > this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. Would like to know if there is any impediment on RocksDb or WindowStore to support this. Adding an argument to reverse in current fetch methods would be great: ``` WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD) ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)