[GitHub] [kafka] sknot-rh commented on pull request #10436: KAFKA-12577; Remove deprecated `ConfigEntry` constructor for 3.0
sknot-rh commented on pull request #10436: URL: https://github.com/apache/kafka/pull/10436#issuecomment-91220 I am using this deprecated ctor in my tests. How can I set, for example `source` option now? The other ctor has a package protected access. ``` ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly, List synonyms, ConfigType type, String documentation) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version
[ https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17409248#comment-17409248 ] Satyam Bala commented on KAFKA-13257: - Currently *{color:#de350b}Blocked{color}* to use Kafka-Streams (neither 2.8 nor 3.0 ) on Alpine based images, until 3.0 release (when?). > KafkaStreams Support For Latest RocksDB Version > --- > > Key: KAFKA-13257 > URL: https://issues.apache.org/jira/browse/KAFKA-13257 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Alagukannan >Priority: Major > Attachments: hs_err_pid6.log > > > Hi, > Can you please let us know if there is any plan for adding the latest > versions of rocksDB in kafka streams. If your planning it what's the timeline > we are looking at. If not planning to upgrade what's the reason behind it. Is > there any significant impact on upgrading like backward combability etc.. > Just to remind this general query to know about the rocksdb upgrade and its > impact on streams application. > The main pain point behind asking this upgrade is, We tried to build an > application with kafka streams 2.8.0 on an alpine based OS and the docker > base image is as follows > azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless. The streams > application worked fine until it had an interaction with state > store(rocksdb). The jvm crashed with the following error: > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207 > # > # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) > (build 11.0.10+9-LTS) > # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed > mode, tiered, compressed oops, g1 gc, linux-amd64) > # Problematic frame: > # C [librocksdbjni15322693993163550519.so+0x271b27] > std::_Rb_tree, > std::less, std::allocator > >::_M_erase(std::_Rb_tree_node*)+0x27 > Then we found out rocksdb works well on glibc and not musl lib, where as > alpine supports musl lib alone for native dependencies. Further looking into > rocksdb for a solution we found that they have started supporting both glib > and musl native libs from 6.5.x versions. > But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is > the main reason behind asking for the rocksDB upgrade in kafka streams as > well. > Have attached the PID log where JVM failures are happening. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ccding commented on pull request #11293: MINOR: defineInternal for KIP-405 configs
ccding commented on pull request #11293: URL: https://github.com/apache/kafka/pull/11293#issuecomment-912200659 This PR fails `testCreateTopicsResponseMetadataAndConfig()`. DescribeTopic doesn't return the internal configs, while the response of CreateTopic includes the internal configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon commented on a change in pull request #11292: URL: https://github.com/apache/kafka/pull/11292#discussion_r701534858 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.function.Supplier; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; + +@RunWith(Parameterized.class) +public class WindowStoreFetchIntegrationTest { +private enum StoreType { InMemory, RocksDB, Timed }; +private static final String STORE_NAME = "store"; +private static final int DATA_SIZE = 5; +private static final long WINDOW_SIZE = 500L; +private static final long RETENTION_MS = 1L; + +private StoreType storeType; +private boolean enableLogging; +private boolean enableCaching; +private boolean forward; + +private LinkedList, Long>> expectedRecords; +private LinkedList> records; +private Properties streamsConfig; + +private TimeWindowedKStream windowedStream; + +public WindowStoreFetchIntegrationTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { +this.storeType = storeType; +this.enableLogging = enableLogging; +this.enableCaching = enableCaching; +this.forward = forward; + +this.records = new LinkedList<>(); +this.expectedRecords = new LinkedList<>(); +final int m = DATA_SIZE / 2; +for (int i = 0; i < DATA_SIZE; i++) { +final String key = "key-" + i * 2; +final String value = "val-" + i * 2; +final KeyValue r = new KeyValue<>(key, value); +records.add(r); +records.add(r); +// expected the count of each key is 2 +final long windowStartTime = i < m ? 0 : WINDOW_SIZE; +expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L)); +} +} + +@Rule +public TestName testName = new TestName(); + +@Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") +public static Collection data() { +final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed); +final List logging = Arrays.asList(true, false); +final
[GitHub] [kafka] showuon commented on pull request #11206: MINOR: Update streams doc to close KeyValueIterator in example code
showuon commented on pull request #11206: URL: https://github.com/apache/kafka/pull/11206#issuecomment-912194561 @bbejeck , please take a look when available. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error
showuon commented on pull request #11086: URL: https://github.com/apache/kafka/pull/11086#issuecomment-912194029 @dajac , I think this PR is good to get merged. WDYT? Failed tests are unrelated. Thanks. ``` Build / JDK 11 and Scala 2.13 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13270) Kafka may fail to connect to ZooKeeper, retry forever, and never start
[ https://issues.apache.org/jira/browse/KAFKA-13270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-13270: -- Description: The implementation of https://issues.apache.org/jira/browse/ZOOKEEPER-3593 in ZooKeeper version 3.6.0 decreased the default value for the ZooKeeper client's `jute.maxbuffer` configuration from 4MB to 1MB. This can cause a problem if Kafka tries to retrieve a large amount of data across many znodes -- in such a case the ZooKeeper client will repeatedly emit a message of the form "java.io.IOException: Packet len <> is out of range" and the Kafka broker will never connect to ZooKeeper and fail to make progress on the startup sequence. We can avoid the potential for this issue to occur by explicitly setting the value to 4MB whenever we create a new ZooKeeper client as long as no explicit value has been set via the `jute.maxbuffer` system property. (was: The implementation of https://issues.apache.org/jira/browse/ZOOKEEPER-3593 in ZooKeeper version 3.6.0 decreased the default value for the ZooKeeper client's `jute.maxbuffer` configuration from 4MB to 1MB. This can cause a problem if Kafka tries to retrieve a large amount of data across many znodes -- in such a case the ZooKeeper client will repeatedly emit a message of the form "java.io.IOException: Packet len <> is out of range" and the Kafka broker will never connect to ZooKeeper and fail make progress on the startup sequence. We can avoid the potential for this issue to occur by explicitly setting the value to 4MB whenever we create a new ZooKeeper client as long as no explicit value has been set via the `jute.maxbuffer` system property.) > Kafka may fail to connect to ZooKeeper, retry forever, and never start > -- > > Key: KAFKA-13270 > URL: https://issues.apache.org/jira/browse/KAFKA-13270 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Blocker > Fix For: 3.0.0 > > > The implementation of https://issues.apache.org/jira/browse/ZOOKEEPER-3593 in > ZooKeeper version 3.6.0 decreased the default value for the ZooKeeper > client's `jute.maxbuffer` configuration from 4MB to 1MB. This can cause a > problem if Kafka tries to retrieve a large amount of data across many znodes > -- in such a case the ZooKeeper client will repeatedly emit a message of the > form "java.io.IOException: Packet len <> is out of range" and the Kafka > broker will never connect to ZooKeeper and fail to make progress on the > startup sequence. We can avoid the potential for this issue to occur by > explicitly setting the value to 4MB whenever we create a new ZooKeeper client > as long as no explicit value has been set via the `jute.maxbuffer` system > property. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13270) Kafka may fail to connect to ZooKeeper, retry forever, and never start
Ron Dagostino created KAFKA-13270: - Summary: Kafka may fail to connect to ZooKeeper, retry forever, and never start Key: KAFKA-13270 URL: https://issues.apache.org/jira/browse/KAFKA-13270 Project: Kafka Issue Type: Bug Affects Versions: 3.0.0 Reporter: Ron Dagostino Assignee: Ron Dagostino Fix For: 3.0.0 The implementation of https://issues.apache.org/jira/browse/ZOOKEEPER-3593 in ZooKeeper version 3.6.0 decreased the default value for the ZooKeeper client's `jute.maxbuffer` configuration from 4MB to 1MB. This can cause a problem if Kafka tries to retrieve a large amount of data across many znodes -- in such a case the ZooKeeper client will repeatedly emit a message of the form "java.io.IOException: Packet len <> is out of range" and the Kafka broker will never connect to ZooKeeper and fail make progress on the startup sequence. We can avoid the potential for this issue to occur by explicitly setting the value to 4MB whenever we create a new ZooKeeper client as long as no explicit value has been set via the `jute.maxbuffer` system property. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13269) Kafka Streams Aggregation data loss between instance restarts and rebalances
Rohit Bobade created KAFKA-13269: Summary: Kafka Streams Aggregation data loss between instance restarts and rebalances Key: KAFKA-13269 URL: https://issues.apache.org/jira/browse/KAFKA-13269 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.2 Reporter: Rohit Bobade Using Kafka Streams 2.6.2 and doing count based aggregation of messages. Also setting Processing Guarantee - EXACTLY_ONCE_BETA and NUM_STANDBY_REPLICAS_CONFIG = 1. Sending some messages and restarting instances in middle while processing to test fault tolerance. The output count is incorrect because of data loss while restoring state. It looks like the streams task becomes active and starts processing even when the state is not fully restored but is within the acceptable recovery lag (default is 1) This results in data loss {quote}A stateful active task is assigned to an instance only when its state is within the configured acceptable.recovery.lag, if one exists {quote} [https://docs.confluent.io/platform/current/streams/developer-guide/running-app.html?_ga=2.33073014.912824567.1630441414-1598368976.1615841473#state-restoration-during-workload-rebalance] [https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_acceptable.recovery.lag] Setting acceptable.recovery.lag to 0 and re-running the chaos tests gives the correct result. Related KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-Computingthemost-caught-upinstances] Just want to get some thoughts on this use case from the Kafka team or if anyone has encountered similar issue -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
hachikuji commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r701454478 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val cleanableLogs = dirtyLogs.filter { ltc => (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } + if(cleanableLogs.isEmpty) { -None +val logsWithTombstonesExpired = dirtyLogs.filter { + case ltc => +// in this case, we are probably in a low throughput situation +// therefore, we should take advantage of this fact and remove tombstones if we can +// under the condition that the log's latest delete horizon is less than the current time +// tracked +ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds() Review comment: @junrao Yeah, that's an interesting idea. Do you think it would be possible to make it a size-based comparison? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #11288: MINOR: Fix error response generation
hachikuji commented on pull request #11288: URL: https://github.com/apache/kafka/pull/11288#issuecomment-912080811 @mimaison Ah, I missed you already opened JIRAs. Never mind my comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11288: MINOR: Fix error response generation
hachikuji commented on a change in pull request #11288: URL: https://github.com/apache/kafka/pull/11288#discussion_r701442901 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -706,6 +706,8 @@ private void checkDescribeConfigsResponseVersions() { private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) { AbstractResponse response = req.getErrorResponse(e); checkResponse(response, req.version(), checkEqualityAndHashCode); +Map errorCounts = response.errorCounts(); +assertTrue(errorCounts.containsKey(Errors.forException(e)), "API Key " + req.apiKey().name + "V" + req.version() + " failed errorCounts test"); Review comment: Is it possible to make this assertion stronger? Would we expect any other errors in the response? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding opened a new pull request #11293: MINOR: defineInternal for KIP-405 configs
ccding opened a new pull request #11293: URL: https://github.com/apache/kafka/pull/11293 We haven't finished implementing KIP-405, therefore we should make KIP-405 configs as defineInternal. We may also want to port this change to 3.0 to avoid leaking these configs to the doc. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13225) Controller skips sending UpdateMetadataRequest when shutting down broker doesnt host partitions
[ https://issues.apache.org/jira/browse/KAFKA-13225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13225. - Fix Version/s: 3.1.0 Resolution: Fixed merged the PR to trunk > Controller skips sending UpdateMetadataRequest when shutting down broker > doesnt host partitions > > > Key: KAFKA-13225 > URL: https://issues.apache.org/jira/browse/KAFKA-13225 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: David Mao >Assignee: David Mao >Priority: Minor > Fix For: 3.1.0 > > > If a broker not hosting replicas for any partitions is shut down while there > are offline partitions, the controller can fail to send out metadata updates > to other brokers in the cluster. > > Since this is a very niche scenario, I will leave the priority as Minor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
junrao merged pull request #11255: URL: https://github.com/apache/kafka/pull/11255 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
junrao commented on a change in pull request #11033: URL: https://github.com/apache/kafka/pull/11033#discussion_r701405911 ## File path: storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java ## @@ -62,16 +63,17 @@ * @param remoteLogSegmentMetadata metadata about the remote log segment. * @throws RemoteStorageException if there are any storage related errors occurred. * @throws IllegalArgumentException if the given metadata instance does not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED} + * @return a Future which will complete once this operation is finished. */ -void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; +Future addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; Review comment: Should we use CompletableFuture instead of Future? With Future, there isn't much the caller could do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408977#comment-17408977 ] John Roesler commented on KAFKA-13261: -- Hi [~xnix] , Your suspicion is correct. TopologyTestDriver doesn't simulate partitions at all, so you won't be able to use it to test this case. When it comes to a repro, you might be interested in this class, which verifies that foreign-key joins perform correctly when the input topics have different partitions: [https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java] If we have a bug, my suspicion would be whether we're correctly capturing the partitioner that you're setting via Repartitioned. I'd suggest modifying that test to be closer to your example and seeing whether or not we still get the correct result. On the subject of Repartitioned, I didn't quite follow why you're doing it. To be clear, when you're doing foreign-key joins, you do not need the two tables to have the same number of partitions, nor do you need them to be co-partitioned. This should work just fine: {code:java} KTable tableB = builder.table("B", stringMaterialized("table.b")); builder .stream("A", Consumed.with(Serde.of(KeyA.class), Serde.of(EventA.class))) .toTable(Named.as("table.a"), aMaterialized("table.a")) .join(tableB, EventA::getKeyB, topicAandBeJoiner(), Named.as("join.ab"), joinMaterialized("join.ab")) .toStream() .to("output", with(...)); {code} Unless you have some other requirement for which you need the repartition operation, I'd suggest just completely dropping those repartition steps. At least, I'd suggest trying out removing them from the topology and verifying if you get the correct join results. I hope this helps! > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r701252846 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java ## @@ -33,172 +39,112 @@ * For key range queries, like fetch(key, fromTime, toTime), use the {@link RocksDBWindowStore} * which uses the {@link WindowKeySchema} to serialize the record bytes for efficient key queries. */ +@SuppressWarnings("unchecked") public class RocksDBTimeOrderedWindowStore Review comment: Sounds good, will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r701252642 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -146,7 +146,7 @@ public void process(final K key, final V1 value) { outerJoinWindowStore.ifPresent(store -> { // Delete the joined record from the non-joined outer window store -store.put(KeyAndJoinSide.make(!isLeftSide, key), null, otherRecordTimestamp); +store.put(KeyAndJoinSide.make(!isLeftSide, key, otherRecordTimestamp), null); Review comment: Same as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r701252490 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -209,37 +208,36 @@ private void emitNonJoinedOuterRecords(final WindowStore, Left // reset to MAX_VALUE in case the store is empty sharedTimeTracker.minTime = Long.MAX_VALUE; -try (final KeyValueIterator>, LeftOrRightValue> it = store.all()) { +try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { while (it.hasNext()) { -final KeyValue>, LeftOrRightValue> record = it.next(); +final KeyValue, LeftOrRightValue> record = it.next(); -final Windowed> windowedKey = record.key; -final LeftOrRightValue value = record.value; -sharedTimeTracker.minTime = windowedKey.window().start(); +final KeyAndJoinSide keyAndJoinSide = record.key; +final LeftOrRightValue value = record.value; +final K key = keyAndJoinSide.getKey(); +final long timestamp = keyAndJoinSide.getTimestamp(); +sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (windowedKey.window().start() + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { break; } -final K key = windowedKey.key().getKey(); -final long time = windowedKey.window().start(); - final R nullJoinedValue; if (isLeftSide) { nullJoinedValue = joiner.apply(key, -(V1) value.getLeftValue(), -(V2) value.getRightValue()); +value.getLeftValue(), +value.getRightValue()); } else { nullJoinedValue = joiner.apply(key, -(V1) value.getRightValue(), -(V2) value.getLeftValue()); +(V1) value.getRightValue(), +(V2) value.getLeftValue()); } -context().forward(key, nullJoinedValue, To.all().withTimestamp(time)); +context().forward(key, nullJoinedValue, To.all().withTimestamp(timestamp)); // Delete the key from the outer window store now it is emitted -store.put(record.key.key(), null, record.key.window().start()); +store.put(keyAndJoinSide, null); Review comment: The `delete` call would incur an additional `get` in order to return the deleted value which is not needed, so I intentionally used `put(k, null)` to avoid that extra get. I can leave a comment above explaining this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r701251823 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ## @@ -63,6 +63,10 @@ private InternalProcessorContext context; private TaskId taskId; +interface WindowedKeySerde { +Bytes serialize(final Bytes key, final long timestamp, final int seqnum); +} + Review comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r701251517 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideSerializer.java ## @@ -55,9 +57,11 @@ public void configure(final Map configs, final boolean isKey) { public byte[] serialize(final String topic, final KeyAndJoinSide data) { final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0); final byte[] keyBytes = keySerializer.serialize(topic, data.getKey()); +final byte[] timestampBytes = timestampSerializer.serialize(topic, data.getTimestamp()); return ByteBuffer -.allocate(keyBytes.length + 1) +.allocate(8 + keyBytes.length + 1) Review comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
guozhangwang commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r701251294 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java ## @@ -48,15 +50,22 @@ public void configure(final Map configs, final boolean isKey) { @Override public KeyAndJoinSide deserialize(final String topic, final byte[] data) { -final boolean bool = data[0] == 1; +final boolean bool = data[8] == 1; Review comment: Ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning
Guozhang Wang created KAFKA-13268: - Summary: Add more integration tests for Table Table FK joins with repartitioning Key: KAFKA-13268 URL: https://issues.apache.org/jira/browse/KAFKA-13268 Project: Kafka Issue Type: Improvement Components: streams, unit tests Reporter: Guozhang Wang We should add to the FK join multipartition integration test with a Repartitioned for: 1) just the new partition count 2) a custom partitioner This is to test if there's a bug where the internal topics don't pick up a partitioner provided that way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408967#comment-17408967 ] Guozhang Wang commented on KAFKA-13261: --- Hello [~xnix] regarding "However, in a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition." Is that a test done through the TopologyTestDriver or is it a full-fledged integration test with Kafka clusters and Kafka Streams clients? > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
junrao commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r701240936 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ## @@ -37,6 +40,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; Review comment: Sorry, I meant adding a description regarding tombstone in the comment of LogCleaner. ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val cleanableLogs = dirtyLogs.filter { ltc => (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } + if(cleanableLogs.isEmpty) { -None +val logsWithTombstonesExpired = dirtyLogs.filter { + case ltc => +// in this case, we are probably in a low throughput situation +// therefore, we should take advantage of this fact and remove tombstones if we can +// under the condition that the log's latest delete horizon is less than the current time +// tracked +ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds() Review comment: Related to this, I am a bit concerned about the extra cleaning due to this. If we have just one tombstone record, this can force a round of cleaning on idle partitions. An alternative way is to clean the number of total surviving records and tombstone records during cleaning. We only trigger a cleaning if #tombstone/#totalRecords > minCleanableRatio. @hachikuji What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
splett2 commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r701247464 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -628,9 +628,11 @@ class KafkaController(val config: KafkaConfig, topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } -// If replica failure did not require leader re-election, inform brokers of the offline brokers +// If no partition has changed leader or ISR, no UpdateMetadataRequest is sent through PartitionStateMachine +// and ReplicaStateMachine. In that case, we want to send an UpdateMetadataRequest explicitly to +// propagate the information about the new offline brokers. Review comment: good point thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
jsancio commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r701234135 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -804,7 +817,7 @@ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List
[GitHub] [kafka] junrao commented on a change in pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
junrao commented on a change in pull request #11255: URL: https://github.com/apache/kafka/pull/11255#discussion_r701230149 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -628,9 +628,11 @@ class KafkaController(val config: KafkaConfig, topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } -// If replica failure did not require leader re-election, inform brokers of the offline brokers +// If no partition has changed leader or ISR, no UpdateMetadataRequest is sent through PartitionStateMachine +// and ReplicaStateMachine. In that case, we want to send an UpdateMetadataRequest explicitly to +// propagate the information about the new offline brokers. Review comment: With this comment, we could just remove the comment on the next line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
Gilles Philippart created KAFKA-13267: - Summary: InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id Key: KAFKA-13267 URL: https://issues.apache.org/jira/browse/KAFKA-13267 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0 Reporter: Gilles Philippart We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these errors pop up in apps using EOS: {code:java} InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id {code} Full stack trace: {code:java} Error encountered sending record to topic ola-update-1 for task 4_7 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. RecordCollectorImpl.java 226 recordSendError(...) RecordCollectorImpl.java:226:in `recordSendError' RecordCollectorImpl.java 196 lambda$send$0(...) RecordCollectorImpl.java:196:in `lambda$send$0' KafkaProducer.java 1365 onCompletion(...) KafkaProducer.java:1365:in `onCompletion' ProducerBatch.java 231 completeFutureAndFireCallbacks(...) ProducerBatch.java:231:in `completeFutureAndFireCallbacks' ProducerBatch.java 159 abort(...) ProducerBatch.java:159:in `abort' RecordAccumulator.java 763 abortBatches(...) RecordAccumulator.java:763:in `abortBatches' More (5 lines) Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic ola-update-1 for task 4_7 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. RecordCollectorImpl.java 226 recordSendError(...) RecordCollectorImpl.java:226:in `recordSendError' org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. {code} I've seen that KAFKA-6821 described the same problem on an earlier version of Kafka and was closed due to the subsequent works on EOS. Another ticket raised recently shows that the exception is still occurring (but the ticket wasn't raised for that specific error): KAFKA-12774 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version
[ https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408917#comment-17408917 ] Bruno Cadonna commented on KAFKA-13257: --- We cannot upgrade the RocksDB version for 2.8 because Kafka Streams exposes a RocksDB API in the `RocksDBConfigSetter` that is not compatible with newer versions of RocksDB. We would break backward compatibility if we upgraded RocksDB in 2.8. For 3.0, there has already been KAFKA-8897 that tracked the RocksDB upgrade. > KafkaStreams Support For Latest RocksDB Version > --- > > Key: KAFKA-13257 > URL: https://issues.apache.org/jira/browse/KAFKA-13257 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Alagukannan >Priority: Major > Attachments: hs_err_pid6.log > > > Hi, > Can you please let us know if there is any plan for adding the latest > versions of rocksDB in kafka streams. If your planning it what's the timeline > we are looking at. If not planning to upgrade what's the reason behind it. Is > there any significant impact on upgrading like backward combability etc.. > Just to remind this general query to know about the rocksdb upgrade and its > impact on streams application. > The main pain point behind asking this upgrade is, We tried to build an > application with kafka streams 2.8.0 on an alpine based OS and the docker > base image is as follows > azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless. The streams > application worked fine until it had an interaction with state > store(rocksdb). The jvm crashed with the following error: > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207 > # > # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) > (build 11.0.10+9-LTS) > # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed > mode, tiered, compressed oops, g1 gc, linux-amd64) > # Problematic frame: > # C [librocksdbjni15322693993163550519.so+0x271b27] > std::_Rb_tree, > std::less, std::allocator > >::_M_erase(std::_Rb_tree_node*)+0x27 > Then we found out rocksdb works well on glibc and not musl lib, where as > alpine supports musl lib alone for native dependencies. Further looking into > rocksdb for a solution we found that they have started supporting both glib > and musl native libs from 6.5.x versions. > But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is > the main reason behind asking for the rocksDB upgrade in kafka streams as > well. > Have attached the PID log where JVM failures are happening. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] spena commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
spena commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r701101588 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideDeserializer.java ## @@ -48,15 +50,22 @@ public void configure(final Map configs, final boolean isKey) { @Override public KeyAndJoinSide deserialize(final String topic, final byte[] data) { -final boolean bool = data[0] == 1; +final boolean bool = data[8] == 1; Review comment: It should be good to add a constant for the `8` number, so it is easily read. for instance, the `rawKey()` has `new byte[data.length - 9]`, which I assume is `len - TIMESTAMP - BOOL`. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyAndJoinSideSerializer.java ## @@ -55,9 +57,11 @@ public void configure(final Map configs, final boolean isKey) { public byte[] serialize(final String topic, final KeyAndJoinSide data) { final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0); final byte[] keyBytes = keySerializer.serialize(topic, data.getKey()); +final byte[] timestampBytes = timestampSerializer.serialize(topic, data.getTimestamp()); return ByteBuffer -.allocate(keyBytes.length + 1) +.allocate(8 + keyBytes.length + 1) Review comment: Should the `8` be a constant variable or just `timestampBytes.length`? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ## @@ -63,6 +63,10 @@ private InternalProcessorContext context; private TaskId taskId; +interface WindowedKeySerde { +Bytes serialize(final Bytes key, final long timestamp, final int seqnum); +} + Review comment: Is this used somewhere? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java ## @@ -33,172 +39,112 @@ * For key range queries, like fetch(key, fromTime, toTime), use the {@link RocksDBWindowStore} * which uses the {@link WindowKeySchema} to serialize the record bytes for efficient key queries. */ +@SuppressWarnings("unchecked") public class RocksDBTimeOrderedWindowStore Review comment: Two things: - Seems `TimeOrderedKeySchema` is not needed anymore. Should the class be removed? - Should we rename the class to remove the `Window` par? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -146,7 +146,7 @@ public void process(final K key, final V1 value) { outerJoinWindowStore.ifPresent(store -> { // Delete the joined record from the non-joined outer window store -store.put(KeyAndJoinSide.make(!isLeftSide, key), null, otherRecordTimestamp); +store.put(KeyAndJoinSide.make(!isLeftSide, key, otherRecordTimestamp), null); Review comment: Should you call `store.delete(KeyAndJoinSide.make(!isLeftSide, key, otherRecordTimestamp))` now? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ## @@ -209,37 +208,36 @@ private void emitNonJoinedOuterRecords(final WindowStore, Left // reset to MAX_VALUE in case the store is empty sharedTimeTracker.minTime = Long.MAX_VALUE; -try (final KeyValueIterator>, LeftOrRightValue> it = store.all()) { +try (final KeyValueIterator, LeftOrRightValue> it = store.all()) { while (it.hasNext()) { -final KeyValue>, LeftOrRightValue> record = it.next(); +final KeyValue, LeftOrRightValue> record = it.next(); -final Windowed> windowedKey = record.key; -final LeftOrRightValue value = record.value; -sharedTimeTracker.minTime = windowedKey.window().start(); +final KeyAndJoinSide keyAndJoinSide = record.key; +final LeftOrRightValue value = record.value; +final K key = keyAndJoinSide.getKey(); +final long timestamp = keyAndJoinSide.getTimestamp(); +sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (windowedKey.window().start() + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { break; } -final K key = windowedKey.key().getKey(); -final long time = windowedKey.window().start(); - final R nullJoinedValue; if (isLeftSide) {
[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786 ] Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 2:34 PM: Hi [~guozhang] and [~abellemare], thank you for your answers. Yes, A and B use a partitioner that use the same value from respective topic key. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} Attached KafkaTest.java, but we've written several junit test cases with the TopologyTestDriver and different amount of test data but are unable to reproduce the problem. (does the test driver consider several partitions?) However, in a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Columns are 'offset, timestamp, partition | [key]value' Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} was (Author: xnix): Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} Attached KafkaTest.java, but we've written several junit test cases with the TopologyTestDriver and different amount of test data but are unable to reproduce the problem. (does the test driver consider several partitions?) However, in a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Columns are 'offset, timestamp, partition | [key]value' Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0
[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-911752624 Hi @cadonna, I've addressed/replied to your comments. Thanks for the feedback. FYI - I'll be offline from next week for 2 weeks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r701143273 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToTagValues = new HashMap<>(); +final Map> tagValueToClients = new HashMap<>(); + +fillClientsTagStatistics(clients, tagValueToClients, tagKeyToTagValues); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask( +numStandbyReplicas, +taskId, +clientId, +rackAwareAssignmentTags, +clients, +tasksToRemainingStandbys, +tagKeyToTagValues, +tagValueToClients +)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> tagValueToClients, + final Map> tagKeyToTagValues) { +for (final Entry clientStateEntry : clientStates.entrySet()) { +final UUID clientId = clientStateEntry.getKey(); +final ClientState clientState = clientStateEntry.getValue(); + +clientState.clientTags().forEach((tagKey, tagValue) -> { +tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue); +
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r701142976 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToTagValues = new HashMap<>(); +final Map> tagValueToClients = new HashMap<>(); + +fillClientsTagStatistics(clients, tagValueToClients, tagKeyToTagValues); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask( +numStandbyReplicas, +taskId, +clientId, +rackAwareAssignmentTags, +clients, +tasksToRemainingStandbys, +tagKeyToTagValues, +tagValueToClients +)); + +return true; Review comment: Pushed the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r701141903 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToTagValues = new HashMap<>(); +final Map> tagValueToClients = new HashMap<>(); Review comment: Sorry, can you elaborate more on this? Currently, when deciding the distribution, algorithm takes into account both, tag key, as well as tag value. So it will treat `key1: value2` and `key2: value2` as different dimensions. Do you think it's something that has to be addressed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r701137236 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToTagValues = new HashMap<>(); +final Map> tagValueToClients = new HashMap<>(); + +fillClientsTagStatistics(clients, tagValueToClients, tagKeyToTagValues); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask( +numStandbyReplicas, +taskId, +clientId, +rackAwareAssignmentTags, +clients, +tasksToRemainingStandbys, +tagKeyToTagValues, +tagValueToClients +)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> tagValueToClients, + final Map> tagKeyToTagValues) { +for (final Entry clientStateEntry : clientStates.entrySet()) { +final UUID clientId = clientStateEntry.getKey(); +final ClientState clientState = clientStateEntry.getValue(); + +clientState.clientTags().forEach((tagKey, tagValue) -> { +tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue); +
[jira] [Updated] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers
[ https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13266: Description: `ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes fails with the following error in the log: {noformat} [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-1 at offset 31727 (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end offset = 31728. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Partition __consumer_offsets-1 marked as failed (kafka.server.ReplicaFetcherThread) {noformat} The issue is due to a race condition in `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created and populated before the partition is removed from the fetcher threads. This means that the fetch offset of the `InitialFetchState` could be outdated when the fetcher threads are re-started because the fetcher threads could have incremented the log end offset in between. The partitions must be removed from the fetcher threads before the `InitialFetchStates` are created. was: `ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes fails with the following error in the log: {noformat} [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-1 at offset 31727 (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end offset = 31728. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Partition __consumer_offsets-1 marked as failed (kafka.server.ReplicaFetcherThread) {noformat}
[jira] [Created] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers
David Jacot created KAFKA-13266: --- Summary: `InitialFetchState` should be created after partition is removed from the fetchers Key: KAFKA-13266 URL: https://issues.apache.org/jira/browse/KAFKA-13266 Project: Kafka Issue Type: Bug Affects Versions: 3.0.0 Reporter: David Jacot Assignee: David Jacot `ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes fails with the following error in the log: {noformat} [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-1 at offset 31727 (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end offset = 31728. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Partition __consumer_offsets-1 marked as failed (kafka.server.ReplicaFetcherThread) {noformat} The issue is due to a race condition in `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created and populated before the partition is removed from the fetcher threads. This means that the fetch offset of the `InitialFetchState` could be outdated when the fetcher threads are re-started because the fetcher threads could have incremented the log end offset in between. The partitions must be removed from the fetcher threads before the `InitialFetchStates` are created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version
[ https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408852#comment-17408852 ] Satyam Bala commented on KAFKA-13257: - shall we open this ticket, until kafka-streams-2.8.0 fixed/upgraded with latest rocksdb or kafka-streams 3.0.0 released ? > KafkaStreams Support For Latest RocksDB Version > --- > > Key: KAFKA-13257 > URL: https://issues.apache.org/jira/browse/KAFKA-13257 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Alagukannan >Priority: Major > Attachments: hs_err_pid6.log > > > Hi, > Can you please let us know if there is any plan for adding the latest > versions of rocksDB in kafka streams. If your planning it what's the timeline > we are looking at. If not planning to upgrade what's the reason behind it. Is > there any significant impact on upgrading like backward combability etc.. > Just to remind this general query to know about the rocksdb upgrade and its > impact on streams application. > The main pain point behind asking this upgrade is, We tried to build an > application with kafka streams 2.8.0 on an alpine based OS and the docker > base image is as follows > azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless. The streams > application worked fine until it had an interaction with state > store(rocksdb). The jvm crashed with the following error: > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207 > # > # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) > (build 11.0.10+9-LTS) > # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed > mode, tiered, compressed oops, g1 gc, linux-amd64) > # Problematic frame: > # C [librocksdbjni15322693993163550519.so+0x271b27] > std::_Rb_tree, > std::less, std::allocator > >::_M_erase(std::_Rb_tree_node*)+0x27 > Then we found out rocksdb works well on glibc and not musl lib, where as > alpine supports musl lib alone for native dependencies. Further looking into > rocksdb for a solution we found that they have started supporting both glib > and musl native libs from 6.5.x versions. > But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is > the main reason behind asking for the rocksDB upgrade in kafka streams as > well. > Have attached the PID log where JVM failures are happening. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786 ] Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 1:25 PM: Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} Attached KafkaTest.java, but we've written several junit test cases with the TopologyTestDriver and different amount of test data but are unable to reproduce the problem. (does the test driver consider several partitions?) However, in a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Columns are 'offset, timestamp, partition | [key]value' Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} was (Author: xnix): Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but are unable to reproduce it. In a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Columns are 'offset, timestamp, partition | [key]value' Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104]
[jira] [Updated] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomas Forsman updated KAFKA-13261: -- Attachment: KafkaTest.java > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized> > joinMaterialized(String name) { > Materialized> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
cadonna commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r701068792 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToTagValues = new HashMap<>(); +final Map> tagValueToClients = new HashMap<>(); + +fillClientsTagStatistics(clients, tagValueToClients, tagKeyToTagValues); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask( +numStandbyReplicas, +taskId, +clientId, +rackAwareAssignmentTags, +clients, +tasksToRemainingStandbys, +tagKeyToTagValues, +tagValueToClients +)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> tagValueToClients, + final Map> tagKeyToTagValues) { +for (final Entry clientStateEntry : clientStates.entrySet()) { +final UUID clientId = clientStateEntry.getKey(); +final ClientState clientState = clientStateEntry.getValue(); + +clientState.clientTags().forEach((tagKey, tagValue) -> { +tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue); +
[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on pull request #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-911654379 > > @showuon , is there an existing integration test for Metered classes? I tried to find one to add the relevant tests but couldnt't find one.. > > Sorry for late reply. Metered store classes are for recording operation metrics, so you should have it when using any built-in state stores, ex: `Stores.windowStoreBuilder` will have `MeteredWindowStore`. Thanks. Thanks @showuon , I have found a bug with my. implementation. Will correct it and also add integration tests and then it could be reviewed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scholzj commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
scholzj commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-911647841 Great, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786 ] Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:50 PM: - Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but are unable to reproduce it. In a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Columns are 'offset, timestamp, partition | [key]value' Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} was (Author: xnix): Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but are unable to reproduce it. In a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123,
[GitHub] [kafka] mimaison commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
mimaison commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-911644486 @scholzj Yes that's the plan, I'll do it when merging into 3.0 reopens -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r701038718 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToTagValues = new HashMap<>(); +final Map> tagValueToClients = new HashMap<>(); + +fillClientsTagStatistics(clients, tagValueToClients, tagKeyToTagValues); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask( +numStandbyReplicas, +taskId, +clientId, +rackAwareAssignmentTags, +clients, +tasksToRemainingStandbys, +tagKeyToTagValues, +tagValueToClients +)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> tagValueToClients, + final Map> tagKeyToTagValues) { +for (final Entry clientStateEntry : clientStates.entrySet()) { +final UUID clientId = clientStateEntry.getKey(); +final ClientState clientState = clientStateEntry.getValue(); + +clientState.clientTags().forEach((tagKey, tagValue) -> { +tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue); +
[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786 ] Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:44 PM: - Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but are unable to reproduce it. In a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} was (Author: xnix): Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. I've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but are unable to reproduce it. In a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104]
[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786 ] Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:44 PM: - Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. I've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but are unable to reproduce it. In a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} was (Author: xnix): Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. I've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but is unable to reproduce it. In a local docker environment I can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when I read from the different topics directly for a specific "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB
[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786 ] Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:41 PM: - Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. I've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but is unable to reproduce it. In a local docker environment I can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when I read from the different topics directly for a specific "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} was (Author: xnix): Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. I've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but is unable to reproduce it. In a local docker environment I can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when I read from the different topics for a specific "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 |
[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions
[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408786#comment-17408786 ] Tomas Forsman commented on KAFKA-13261: --- Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. I've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but is unable to reproduce it. In a local docker environment I can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when I read from the different topics for a specific "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} > KTable to KTable foreign key join loose events when using several partitions > > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: Tomas Forsman >Priority: Major > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized aMaterialized(String name) { > Materialized> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned repartitionTopicA() { > Repartitioned repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner > topicAPartitioner() { > return (topic, key, value,
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r701038718 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToTagValues = new HashMap<>(); +final Map> tagValueToClients = new HashMap<>(); + +fillClientsTagStatistics(clients, tagValueToClients, tagKeyToTagValues); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask( +numStandbyReplicas, +taskId, +clientId, +rackAwareAssignmentTags, +clients, +tasksToRemainingStandbys, +tagKeyToTagValues, +tagValueToClients +)); + +return true; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> tagValueToClients, + final Map> tagKeyToTagValues) { +for (final Entry clientStateEntry : clientStates.entrySet()) { +final UUID clientId = clientStateEntry.getKey(); +final ClientState clientState = clientStateEntry.getValue(); + +clientState.clientTags().forEach((tagKey, tagValue) -> { +tagKeyToTagValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue); +
[GitHub] [kafka] showuon commented on pull request #11227: KAFKA-13211: add support for infinite range query for WindowStore
showuon commented on pull request #11227: URL: https://github.com/apache/kafka/pull/11227#issuecomment-911629109 Integration tests added, but found a bug that will fail these tests. Will wait for the PR got merged and continue this PR. Thanks. https://github.com/apache/kafka/pull/11292 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scholzj commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
scholzj commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-911607580 @mimaison Will this also get into the 3.0.x release stream? I know it is probably late for 3.0.0 which already has RCs, so that is fine. But it would be nice to have it in 3.0.1 (if there ever is a 3.0.1 of course). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-911592247 @guozhangwang , these errors are not due to the changes in this PR: `imported `Named` is permanently hidden by definition of type Named in package kstream` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version
[ https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408759#comment-17408759 ] Bruno Cadonna commented on KAFKA-13257: --- [~satyam.b...@gmail.com] Using a Kafka Streams version with a version of RocksDB different from the one in the Kafka dependencies ([here the dependencies for 2.8|https://github.com/apache/kafka/blob/2.8/gradle/dependencies.gradle]) is not officially supported. It may or may not work. If you want to use a newer version of RocksDB, you need to wait until 3.0. > KafkaStreams Support For Latest RocksDB Version > --- > > Key: KAFKA-13257 > URL: https://issues.apache.org/jira/browse/KAFKA-13257 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Alagukannan >Priority: Major > Attachments: hs_err_pid6.log > > > Hi, > Can you please let us know if there is any plan for adding the latest > versions of rocksDB in kafka streams. If your planning it what's the timeline > we are looking at. If not planning to upgrade what's the reason behind it. Is > there any significant impact on upgrading like backward combability etc.. > Just to remind this general query to know about the rocksdb upgrade and its > impact on streams application. > The main pain point behind asking this upgrade is, We tried to build an > application with kafka streams 2.8.0 on an alpine based OS and the docker > base image is as follows > azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless. The streams > application worked fine until it had an interaction with state > store(rocksdb). The jvm crashed with the following error: > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207 > # > # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) > (build 11.0.10+9-LTS) > # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed > mode, tiered, compressed oops, g1 gc, linux-amd64) > # Problematic frame: > # C [librocksdbjni15322693993163550519.so+0x271b27] > std::_Rb_tree, > std::less, std::allocator > >::_M_erase(std::_Rb_tree_node*)+0x27 > Then we found out rocksdb works well on glibc and not musl lib, where as > alpine supports musl lib alone for native dependencies. Further looking into > rocksdb for a solution we found that they have started supporting both glib > and musl native libs from 6.5.x versions. > But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is > the main reason behind asking for the rocksDB upgrade in kafka streams as > well. > Have attached the PID log where JVM failures are happening. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13265) Kafka consumers disappearing after certain point of time
Ayyandurai Mani created KAFKA-13265: --- Summary: Kafka consumers disappearing after certain point of time Key: KAFKA-13265 URL: https://issues.apache.org/jira/browse/KAFKA-13265 Project: Kafka Issue Type: Test Components: consumer Affects Versions: 2.4.0 Reporter: Ayyandurai Mani Attachments: Consumer_Disappear_Issue_Screen.png, server.log Dear Kafka Team, We are facing one issue for past few days in our development environment. We have topic called 'search-service-topic-dev' and consumer group 'search-service-group' with 10 partitions, and concurrency also 10 at consumer side. When we publish more messages( each message is 115kb) into the topic after some certain point of the time consumers disappeared from the consumer group (note : consumer service are running). Have attached screenshot for reference (filename : Consumer_Disappear_Issue_Screen.png) >From screenshot when i execute describe command for the consumer group at >14:35:32 (IST) consumers were available but when i execute at 14:38:17(IST) >consumers were not there. Attached kafka server.log for that particular time(kafka is running in UTC timezone server). Note : Message size in each partitions is around 2GB. We are kind of blocked due to this behavior. Please help me to resolve this. Thanks in advance. Ayyandurai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ashishpatil09 commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
ashishpatil09 commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-911500667 Hi Guys Is there any plan to release this fix soon? Thanks Ashish -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
cadonna commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r700864299 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); +final Map statefulTasksWithClients = new HashMap<>(); + +statefulTaskIds.forEach(statefulTaskId -> clients.forEach((uuid, clientState) -> { +if (clientState.activeTasks().contains(statefulTaskId)) { +statefulTasksWithClients.put(statefulTaskId, uuid); +} +})); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToTagValues = new HashMap<>(); +final Map> tagValueToClients = new HashMap<>(); + +fillClientsTagStatistics(clients, tagValueToClients, tagKeyToTagValues); + +statefulTasksWithClients.forEach((taskId, clientId) -> assignStandbyTasksForActiveTask( +numStandbyReplicas, +taskId, +clientId, +rackAwareAssignmentTags, +clients, +tasksToRemainingStandbys, +tagKeyToTagValues, +tagValueToClients +)); + +return true; Review comment: Although we never use the returned value from a standby task assignor, I would return `false` since a standby task assignment will never require a follow-up probing rebalance. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */
[GitHub] [kafka] showuon commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon commented on pull request #11292: URL: https://github.com/apache/kafka/pull/11292#issuecomment-911448780 @jeqo @ableegoldman @guozhangwang , please help review this PR. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon commented on a change in pull request #11292: URL: https://github.com/apache/kafka/pull/11292#discussion_r700910107 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java ## @@ -499,10 +499,14 @@ public void close() { final Map.Entry> currentSegment = segmentIterator.next(); currentTime = currentSegment.getKey(); -if (allKeys) { -return currentSegment.getValue().entrySet().iterator(); +final ConcurrentNavigableMap subMap = allKeys ? +currentSegment.getValue() : +currentSegment.getValue().subMap(keyFrom, true, keyTo, true); + +if (forward) { +return subMap.entrySet().iterator(); } else { -return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator(); +return subMap.descendingMap().entrySet().iterator(); Review comment: Before this change, when setting records iterator, we only consider the `allKey` case, not the `forward/backward` cases. Fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon commented on a change in pull request #11292: URL: https://github.com/apache/kafka/pull/11292#discussion_r700908241 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java ## @@ -1176,6 +1105,7 @@ private void putFirstBatch(final WindowStore store, store.put(0, "zero", startTime); store.put(1, "one", startTime + 1L); store.put(2, "two", startTime + 2L); +store.put(3, "three", startTime + 2L); Review comment: add 2 records at the same timestamp to test the forward and backward fetch cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dadufour commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
dadufour commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-911390634 Thanks for the feedback -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
mimaison commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-911379969 @akatona84 Thanks for the fix and sorry again for the delay merging it. @dadufour At this point, it's unlikely there will be a 2.7.2 release. I've backported it to 2.8 so it will be in the 2.8.1 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9747) No tasks created for a connector
[ https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-9747: -- Fix Version/s: 2.8.1 > No tasks created for a connector > > > Key: KAFKA-9747 > URL: https://issues.apache.org/jira/browse/KAFKA-9747 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 > Environment: OS: Ubuntu 18.04 LTS > Platform: Confluent Platform 5.4 > HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge >Reporter: Vit Koma >Assignee: Andras Katona >Priority: Major > Fix For: 3.1.0, 2.8.1 > > Attachments: connect-distributed.properties, connect.log > > > We are running Kafka Connect in a distributed mode on 3 nodes using Debezium > (MongoDB) and Confluent S3 connectors. When adding a new connector via the > REST API the connector is created in RUNNING state, but no tasks are created > for the connector. > Pausing and resuming the connector does not help. When we stop all workers > and then start them again, the tasks are created and everything runs as it > should. > The issue does not show up if we run only a single node. > The issue is not caused by the connector plugins, because we see the same > behaviour for both Debezium and S3 connectors. Also in debug logs I can see > that Debezium is correctly returning a task configuration from the > Connector.taskConfigs() method. > Connector configuration examples > Debezium: > {code} > { > "name": "qa-mongodb-comp-converter-task|1", > "config": { > "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", > "mongodb.hosts": > "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", > "mongodb.name": "qa-debezium-comp", > "mongodb.ssl.enabled": true, > "collection.whitelist": "converter[.]task", > "tombstones.on.delete": true > } > } > {code} > S3 Connector: > {code} > { > "name": "qa-s3-sink-task|1", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "topics": "qa-debezium-comp.converter.task", > "topics.dir": "data/env/qa", > "s3.region": "eu-west-1", > "s3.bucket.name": "", > "flush.size": "15000", > "rotate.interval.ms": "360", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": > "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "partitioner.class": > "io.confluent.connect.storage.partitioner.DefaultPartitioner", > "schema.compatibility": "NONE", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter.schemas.enable": false, > "transforms": "ExtractDocument", > > "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" > } > } > {code} > The connectors are created using curl: {{curl -X POST -H "Content-Type: > application/json" --data @ http:/:10083/connectors}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17408666#comment-17408666 ] Ashish Patil commented on KAFKA-9366: - Hi Guys Is there any plan to fix this issue soon? Thanks Ashish > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.1.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #11292: [WIP] KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon opened a new pull request #11292: URL: https://github.com/apache/kafka/pull/11292 We forgot to make each segment in reverse order (i.e. in `descendingMap`) in `InMemoryWindowStore`. Fix it and add integration tests for it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
mimaison merged pull request #11174: URL: https://github.com/apache/kafka/pull/11174 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13264) backwardFetch in InMemoryWindowStore doesn't return in reverse order
Luke Chen created KAFKA-13264: - Summary: backwardFetch in InMemoryWindowStore doesn't return in reverse order Key: KAFKA-13264 URL: https://issues.apache.org/jira/browse/KAFKA-13264 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.8.0 Reporter: Luke Chen Assignee: Luke Chen When working on another PR, I found currently, the backwardFetch in InMemoryWindowStore doesn't return in reverse order when there are records in the same window. ex: window size = 500, input records: key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500\] window key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500\] window when fetch in forward order: "a" -> "b", which is expected when fetch in backward order: "a" -> "b", which is NOT expected -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
satishd commented on pull request #11033: URL: https://github.com/apache/kafka/pull/11033#issuecomment-911240472 @junrao gentle reminder to review the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org