[jira] [Commented] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
[ https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790603#comment-17790603 ] John Roesler commented on KAFKA-15768: -- Hey [~hanyuzheng] , thanks for the bug report! I agree with you that if there is exactly one partition responding and it responds with a FailedQueryResult, then it could make sense to return it instead of throwing an exception. However, I do want to clarify that an expected usage of this method is to do something like issue a key-based query to multiple partitions (because you don't know which partition hosts the key, if any) and then getting the result back from whichever partition responded with a non-null result. In other words, it should return the result if and only if all queried partitions responded successfully AND at most one partition returned a non-null result. >From that perspective, it might actually be *more confusing* to return the >FailedQueryResult instead of throwing an exception in the >single-partition-response case, since it means that callers have two error >paths to handle. I.e., they will have to write code like: ``` {{try {}} {{ }}{{onlyResult = result.getOnlyPartitionResult()}} {{ if (onlyResult.isSuccessful()) {}} {{ doSomething(onlyResult);}} {{ } else {}} {{ handleFailureFromResult(onlyResult);}} {{ }}} {{} catch (RuntimeException e) {}} {{ handleFailureFromException(e);}} {{}}} ``` Answer to the side-note question: Maybe it's a bit philosophical, but the object upon which you call an instance method is in some sense an argument to the method (e.g., `this` is always an argument to an instance method). I don't think IllegalStateException would be better, since the application isn't in an illegal state (an illegal state is one which the code author thinks should never be reached, I.e., it would indicate a programming error within the framework). I could see an "argument" for choosing another exception type, or maybe declaring one for this purpose. I.e., something like an "assertion violated" exception might be more clear than IllegalArgumentException. > StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult > -- > > Key: KAFKA-15768 > URL: https://issues.apache.org/jira/browse/KAFKA-15768 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Hanyu Zheng >Priority: Major > > Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect > `IllegalArgumentException` if any result is a `FailedQueryResult` (and even > if there is only a single FailedQueryResult). > The issue is the internal `filter(r -> r.getResult() != 0)` step, that > blindly (and incorrectly) calls `getResult`. > Given the semantics of `getOnlyPartitionResult` we should not care if the > result is SuccessQueryResult or FailedQueryResult, but only check if there is > a single result or not. (The user has no means to avoid getting the > underlying error otherwise.) > Side-note: why does `FailedQueryResult#getResult` throw an > IllegalArgumentException (there is no argument passed into the method – it > should rather be an `IllegalStateException` – but I guess we would need a KIP > for this fix?) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13531: - Attachment: org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout > Flaky test NamedTopologyIntegrationTest > --- > > Key: KAFKA-13531 > URL: https://issues.apache.org/jira/browse/KAFKA-13531 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Matthew de Detrich >Priority: Critical > Labels: flaky-test > Attachments: > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout > > > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets > {quote}java.lang.AssertionError: Did not receive all 3 records from topic > output-stream-2 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <3> but: <0> was less than <3> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote} > STDERR > {quote}java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39) > at > org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122) > at > org.apache.kafka.streams.processor.internals.TopologyMetadata.maybeNotifyTopologyVersionWaiters(TopologyMetadata.java:154) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkForTopologyUpdates(StreamThread.java:916) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) > Caused by: org.apache.kafka.common.errors.GroupSubscribedToTopicException: > Deleting offsets of a topic is forbidden while the consumer group is actively > subscribed to it. java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at >
[jira] [Commented] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730190#comment-17730190 ] John Roesler commented on KAFKA-13531: -- Seen again while verifying the 3.5.0 release artifacts (this was the only test failure): Gradle Test Run :streams:test > Gradle Test Executor 554 > NamedTopologyIntegrationTest > shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() FAILED java.lang.AssertionError: Did not receive all 3 records from topic output-stream-1 within 6 ms, currently accumulated data is [] Expected: is a value equal to or greater than <3> but: <0> was less than <3> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:730) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:353) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:726) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:699) at org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing(NamedTopologyIntegrationTest.java:563) logs: [^org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout] > Flaky test NamedTopologyIntegrationTest > --- > > Key: KAFKA-13531 > URL: https://issues.apache.org/jira/browse/KAFKA-13531 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Matthew de Detrich >Priority: Critical > Labels: flaky-test > Attachments: > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout > > > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets > {quote}java.lang.AssertionError: Did not receive all 3 records from topic > output-stream-2 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <3> but: <0> was less than <3> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote} > STDERR > {quote}java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39) > at > org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122) > at >
[jira] [Updated] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-14995: - Labels: newbie (was: ) > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14995) Automate asf.yaml collaborators refresh
John Roesler created KAFKA-14995: Summary: Automate asf.yaml collaborators refresh Key: KAFKA-14995 URL: https://issues.apache.org/jira/browse/KAFKA-14995 Project: Kafka Issue Type: Improvement Reporter: John Roesler We have added a policy to use the asf.yaml Github Collaborators: [https://github.com/apache/kafka-site/pull/510] The policy states that we set this list to be the top 20 commit authors who are not Kafka committers. Unfortunately, it's not trivial to compute this list. Here is the process I followed to generate the list the first time (note that I generated this list on 2023-04-28, so the lookback is one year: 1. List authors by commit volume in the last year: {code:java} $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} 2. manually filter out the authors who are committers, based on [https://kafka.apache.org/committers] 3. truncate the list to 20 authors 4. for each author 4a. Find a commit in the `git log` that they were the author on: {code:java} commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 Author: hudeqi <1217150...@qq.com> Date: Fri May 12 14:03:17 2023 +0800 ...{code} 4b. Look up that commit in Github: [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] 4c. Copy their Github username into .asf.yaml under both the PR whitelist and the Collaborators lists. 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] This is pretty time consuming and is very scriptable. Two complications: * To do the filtering, we need to map from Git log "Author" to documented Kafka "Committer" that we can use to perform the filter. Suggestion: just update the structure of the "Committers" page to include their Git "Author" name and email ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] * To generate the YAML lists, we need to map from Git log "Author" to Github username. There's presumably some way to do this in the Github REST API (the mapping is based on the email, IIUC), or we could also just update the Committers page to also document each committer's Github username. Ideally, we would write this script (to be stored in the Apache Kafka repo) and create a Github Action to run it every three months. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14364) Support evolving serde with Foreign Key Join
John Roesler created KAFKA-14364: Summary: Support evolving serde with Foreign Key Join Key: KAFKA-14364 URL: https://issues.apache.org/jira/browse/KAFKA-14364 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler The current implementation of Foreign-Key join uses a hash comparison to determine whether it should emit join results or not. See [https://github.com/apache/kafka/blob/807c5b4d282e7a7a16d0bb94aa2cda9566a7cc2d/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java#L94-L110] As specified in KIP-213 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable] ), we must do a comparison of this nature in order to get correct results when the foreign-key reference changes, as the old reference might emit delayed results after the new instance generates its updated results, leading to an incorrect final join state. The hash comparison prevents this race condition by ensuring that any emitted results correspond to the _current_ version of the left-hand-side record (and therefore that the foreign-key reference itself has not changed). An undesired side-effect of this is that if users update their serdes (in a compatible way), for example to add a new optional field to the record, then the resulting hash will change for existing records. This will cause Streams to stop emitting results for those records until a new left-hand-side update comes in, recording a new hash for those records. It should be possible to provide a fix. Some ideas: * only consider the foreign-key references itself in the hash function (this was the original proposal, but we opted to hash the entire record as an optimization to suppress unnecessary updates). * provide a user-overridable hash function. This would be more flexible, but also pushes a lot of complexity onto users, and opens up the possibility to completely break semantics. We will need to design the solution carefully so that we can preserve the desired correctness guarantee. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14254) Format timestamps in assignor logs as dates instead of integers
John Roesler created KAFKA-14254: Summary: Format timestamps in assignor logs as dates instead of integers Key: KAFKA-14254 URL: https://issues.apache.org/jira/browse/KAFKA-14254 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler This is a follow-on task from [https://github.com/apache/kafka/pull/12582] There is another log line that prints the same timestamp: "Triggering the followup rebalance scheduled for ...", which should also be printed as a date/time in the same manner as PR 12582. We should also search the codebase a little to see if we're printing timestamps in other log lines that would be better off as date/times. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14254) Format timestamps in assignor logs as dates instead of integers
[ https://issues.apache.org/jira/browse/KAFKA-14254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-14254: - Labels: newbie newbie++ (was: ) > Format timestamps in assignor logs as dates instead of integers > --- > > Key: KAFKA-14254 > URL: https://issues.apache.org/jira/browse/KAFKA-14254 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: newbie, newbie++ > > This is a follow-on task from [https://github.com/apache/kafka/pull/12582] > There is another log line that prints the same timestamp: "Triggering the > followup rebalance scheduled for ...", which should also be printed as a > date/time in the same manner as PR 12582. > We should also search the codebase a little to see if we're printing > timestamps in other log lines that would be better off as date/times. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs
[ https://issues.apache.org/jira/browse/KAFKA-14253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-14253: - Labels: newbie newbie++ (was: newbie++) > StreamsPartitionAssignor should print the member count in assignment logs > - > > Key: KAFKA-14253 > URL: https://issues.apache.org/jira/browse/KAFKA-14253 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: newbie, newbie++ > > Debugging rebalance and assignment issues is harder than it needs to be. One > simple thing that can help is to print out information in the logs that users > have to compute today. > For example, the StreamsPartitionAssignor prints two messages that contain > the the newline-delimited group membership: > {code:java} > [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread > [...-StreamThread-1-consumer] All members participating in this rebalance: > : [] > : [] > : []{code} > and > {code:java} > [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread > [...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] > to clients as: > =[activeTasks: ([...]) standbyTasks: ([...])] > =[activeTasks: ([...]) standbyTasks: ([...])] > =[activeTasks: ([...]) standbyTasks: ([...]) > {code} > > In both of these cases, it would be nice to: > # Include the number of members in the group (I.e., "15 members > participating" and "to 15 clients as") > # sort the member ids (to help compare the membership and assignment across > rebalances) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs
[ https://issues.apache.org/jira/browse/KAFKA-14253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-14253: - Labels: newbie++ (was: ) > StreamsPartitionAssignor should print the member count in assignment logs > - > > Key: KAFKA-14253 > URL: https://issues.apache.org/jira/browse/KAFKA-14253 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: newbie++ > > Debugging rebalance and assignment issues is harder than it needs to be. One > simple thing that can help is to print out information in the logs that users > have to compute today. > For example, the StreamsPartitionAssignor prints two messages that contain > the the newline-delimited group membership: > {code:java} > [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread > [...-StreamThread-1-consumer] All members participating in this rebalance: > : [] > : [] > : []{code} > and > {code:java} > [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread > [...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] > to clients as: > =[activeTasks: ([...]) standbyTasks: ([...])] > =[activeTasks: ([...]) standbyTasks: ([...])] > =[activeTasks: ([...]) standbyTasks: ([...]) > {code} > > In both of these cases, it would be nice to: > # Include the number of members in the group (I.e., "15 members > participating" and "to 15 clients as") > # sort the member ids (to help compare the membership and assignment across > rebalances) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14253) StreamsPartitionAssignor should print the member count in assignment logs
John Roesler created KAFKA-14253: Summary: StreamsPartitionAssignor should print the member count in assignment logs Key: KAFKA-14253 URL: https://issues.apache.org/jira/browse/KAFKA-14253 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Debugging rebalance and assignment issues is harder than it needs to be. One simple thing that can help is to print out information in the logs that users have to compute today. For example, the StreamsPartitionAssignor prints two messages that contain the the newline-delimited group membership: {code:java} [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread [...-StreamThread-1-consumer] All members participating in this rebalance: : [] : [] : []{code} and {code:java} [StreamsPartitionAssignor] [...-StreamThread-1] stream-thread [...-StreamThread-1-consumer] Assigned tasks [...] including stateful [...] to clients as: =[activeTasks: ([...]) standbyTasks: ([...])] =[activeTasks: ([...]) standbyTasks: ([...])] =[activeTasks: ([...]) standbyTasks: ([...]) {code} In both of these cases, it would be nice to: # Include the number of members in the group (I.e., "15 members participating" and "to 15 clients as") # sort the member ids (to help compare the membership and assignment across rebalances) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14202) IQv2: Expose binary store schema to store implementations
John Roesler created KAFKA-14202: Summary: IQv2: Expose binary store schema to store implementations Key: KAFKA-14202 URL: https://issues.apache.org/jira/browse/KAFKA-14202 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler One feature of IQv2 is that store implementations can handle custom queries. Many custom query handlers will need to process the key or value bytes, for example deserializing them to implement some filter or aggregations, or even performing binary operations on them. For the most part, this should be straightforward for users, since they provide Streams with the serdes, the store implementation, and the custom queries. However, Streams will sometimes pack extra data around the data produced by the user-provided serdes. For example, the Timestamped store wrappers add a timestamp on the beginning of the value byte array. And in Windowed stores, we add window timestamps to the key bytes. It would be nice to have some generic mechanism to communicate those schemas to the user-provided inner store layers to support users who need to write custom queries. For example, perhaps we can add an extractor class to the state store context -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13978) Trigger UncaughtExceptionHandler for IllegalArgument and IllegalState exceptions
[ https://issues.apache.org/jira/browse/KAFKA-13978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17578023#comment-17578023 ] John Roesler commented on KAFKA-13978: -- Hey [~bryves] , I just checked trunk and 3.3, and I don't see the code that you're removing in your PR. It seems like someone else may have fixed this issue in the mean time. Can you confirm whether or not the issue is resolved? > Trigger UncaughtExceptionHandler for IllegalArgument and IllegalState > exceptions > > > Key: KAFKA-13978 > URL: https://issues.apache.org/jira/browse/KAFKA-13978 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.2.0, 3.1.1 >Reporter: Ben >Assignee: Ben >Priority: Blocker > Fix For: 3.3.0 > > > In [KAFKA-12887|https://issues.apache.org/jira/browse/KAFKA-12887] > [(PR)|https://github.com/apache/kafka/pull/11228/files] changes were made to > prevent thrown IllegalStateException and IllegalArgumentExceptions from being > passed to a registered exception handler. > > I believe these changes should be reverted for the following reasons: > * Making this change is backwards incompatible with existing applications > which may have expected those exceptions to be handled. > > * Users can (and do!) throw these exceptions, often for legitimate reasons. > For instance, IllegalArgumentException is thrown when a method is passed the > wrong argument. This is exactly the type of uncaught exception a user would > expect to be handled by the uncaught exception handler, rather than by the > calling code. > > * The change is inconsistent. Why only these two exceptions, and not all > runtime exceptions? > > * The change is not well documented. There are even tutorial resources which > actually use these exceptions, [for example > here|https://developer.confluent.io/tutorials/error-handling/confluent.html]. > If we make this change, it should be better communicated. As implemented, it > is extremely surprising that this happens. > > * Finally, what value is the change actually adding to the project? It > restricts user freedom, increases complexity, and does not improve safety. We > should only make a backwards-incompatible change like this if there is clear > value in doing so. > > As a note, reverting this is not (in my view) going to impact users > negatively. It is unlikely many people depend on this functionality, and if > they do, it should be easy to communicate in the release notes, and for them > to adjust their code accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14020) Performance regression in Producer
[ https://issues.apache.org/jira/browse/KAFKA-14020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566017#comment-17566017 ] John Roesler commented on KAFKA-14020: -- Hey [~alivshits] , thanks for your work on [https://github.com/apache/kafka/pull/12365] . I've just re-run the same benchmark above and confirmed that your PR fixes the perf regression. Thank you! As a reminder, this was the baseline for "good" performance: Commit: [{{e3202b9}}|https://github.com/apache/kafka/commit/e3202b9ef4c63aab2e5ab049978704282792] (the parent of the problematic commit) TPut: *118k±1k* And when I ran the same benchmark on [{{3a6500b}}|https://github.com/apache/kafka/commit/3a6500bb12b8c5716f7d99b6cec1c521f6f029c2] , I got: TPut: *117k±1k* > Performance regression in Producer > -- > > Key: KAFKA-14020 > URL: https://issues.apache.org/jira/browse/KAFKA-14020 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.3.0 >Reporter: John Roesler >Assignee: Artem Livshits >Priority: Blocker > > [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a] > introduced a 10% performance regression in the KafkaProducer under a default > config. > > The context for this result is a benchmark that we run for Kafka Streams. The > benchmark provisions 5 independent AWS clusters, including one broker node on > an i3.large and one client node on an i3.large. During a benchmark run, we > first run the Producer for 10 minutes to generate test data, and then we run > Kafka Streams under a number of configurations to measure its performance. > Our observation was a 10% regression in throughput under the simplest > configuration, in which Streams simply consumes from a topic and does nothing > else. That benchmark actually runs faster than the producer that generates > the test data, so its thoughput is bounded by the data generator's > throughput. After investigation, we realized that the regression was in the > data generator, not the consumer or Streams. > We have numerous benchmark runs leading up to the commit in question, and > they all show a throughput in the neighborhood of 115,000 records per second. > We also have 40 runs including and after that commit, and they all show a > throughput in the neighborhood of 105,000 records per second. A test on > [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] > shows a return to around 115,000 records per second. > Config: > {code:java} > final Properties properties = new Properties(); > properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); > properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > {code} > Here's the producer code in the data generator. Our tests were running with > three produceThreads. > {code:java} > for (int t = 0; t < produceThreads; t++) { > futures.add(executorService.submit(() -> { > int threadTotal = 0; > long lastPrint = start; > final long printInterval = Duration.ofSeconds(10).toMillis(); > long now; > try (final org.apache.kafka.clients.producer.Producer > producer = new KafkaProducer<>(producerConfig(broker))) { > while (limit > (now = System.currentTimeMillis()) - start) { > for (int i = 0; i < 1000; i++) { > final String key = keys.next(); > final String data = dataGen.generate(); > producer.send(new ProducerRecord<>(topic, key, > valueBuilder.apply(key, data))); > threadTotal++; > } > if ((now - lastPrint) > printInterval) { > System.out.println(Thread.currentThread().getName() + " > produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + > Duration.ofMillis(now - start)); > lastPrint = now; > } > } > } > total.addAndGet(threadTotal); > System.out.println(Thread.currentThread().getName() + " finished (" + > numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start)); > })); > }{code} > As you can see, this is a very basic usage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14020) Performance regression in Producer
[ https://issues.apache.org/jira/browse/KAFKA-14020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17558796#comment-17558796 ] John Roesler commented on KAFKA-14020: -- {color:#1d1c1d}FYI, just setting the partitioner back to the {color}{{DefaultPartitioner}}{color:#1d1c1d} does not appear to help. The throughput of that test was 105k±2k records per second.{color} {color:#1d1c1d}Code under test: {color}[https://github.com/apache/kafka/commit/6c67adb8beedafca0316d1c9ec4a3c219aaec219] > Performance regression in Producer > -- > > Key: KAFKA-14020 > URL: https://issues.apache.org/jira/browse/KAFKA-14020 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.3.0 >Reporter: John Roesler >Priority: Blocker > > [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a] > introduced a 10% performance regression in the KafkaProducer under a default > config. > > The context for this result is a benchmark that we run for Kafka Streams. The > benchmark provisions 5 independent AWS clusters, including one broker node on > an i3.large and one client node on an i3.large. During a benchmark run, we > first run the Producer for 10 minutes to generate test data, and then we run > Kafka Streams under a number of configurations to measure its performance. > Our observation was a 10% regression in throughput under the simplest > configuration, in which Streams simply consumes from a topic and does nothing > else. That benchmark actually runs faster than the producer that generates > the test data, so its thoughput is bounded by the data generator's > throughput. After investigation, we realized that the regression was in the > data generator, not the consumer or Streams. > We have numerous benchmark runs leading up to the commit in question, and > they all show a throughput in the neighborhood of 115,000 records per second. > We also have 40 runs including and after that commit, and they all show a > throughput in the neighborhood of 105,000 records per second. A test on > [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] > shows a return to around 115,000 records per second. > Config: > {code:java} > final Properties properties = new Properties(); > properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); > properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > {code} > Here's the producer code in the data generator. Our tests were running with > three produceThreads. > {code:java} > for (int t = 0; t < produceThreads; t++) { > futures.add(executorService.submit(() -> { > int threadTotal = 0; > long lastPrint = start; > final long printInterval = Duration.ofSeconds(10).toMillis(); > long now; > try (final org.apache.kafka.clients.producer.Producer > producer = new KafkaProducer<>(producerConfig(broker))) { > while (limit > (now = System.currentTimeMillis()) - start) { > for (int i = 0; i < 1000; i++) { > final String key = keys.next(); > final String data = dataGen.generate(); > producer.send(new ProducerRecord<>(topic, key, > valueBuilder.apply(key, data))); > threadTotal++; > } > if ((now - lastPrint) > printInterval) { > System.out.println(Thread.currentThread().getName() + " > produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + > Duration.ofMillis(now - start)); > lastPrint = now; > } > } > } > total.addAndGet(threadTotal); > System.out.println(Thread.currentThread().getName() + " finished (" + > numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start)); > })); > }{code} > As you can see, this is a very basic usage. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-14020) Performance regression in Producer
John Roesler created KAFKA-14020: Summary: Performance regression in Producer Key: KAFKA-14020 URL: https://issues.apache.org/jira/browse/KAFKA-14020 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 3.3.0 Reporter: John Roesler [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a] introduced a 10% performance regression in the KafkaProducer under a default config. The context for this result is a benchmark that we run for Kafka Streams. The benchmark provisions 5 independent AWS clusters, including one broker node on an i3.large and one client node on an i3.large. During a benchmark run, we first run the Producer for 10 minutes to generate test data, and then we run Kafka Streams under a number of configurations to measure its performance. Our observation was a 10% regression in throughput under the simplest configuration, in which Streams simply consumes from a topic and does nothing else. That benchmark actually runs faster than the producer that generates the test data, so its thoughput is bounded by the data generator's throughput. After investigation, we realized that the regression was in the data generator, not the consumer or Streams. We have numerous benchmark runs leading up to the commit in question, and they all show a throughput in the neighborhood of 115,000 records per second. We also have 40 runs including and after that commit, and they all show a throughput in the neighborhood of 105,000 records per second. A test on [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] shows a return to around 115,000 records per second. Config: {code:java} final Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); {code} Here's the producer code in the data generator. Our tests were running with three produceThreads. {code:java} for (int t = 0; t < produceThreads; t++) { futures.add(executorService.submit(() -> { int threadTotal = 0; long lastPrint = start; final long printInterval = Duration.ofSeconds(10).toMillis(); long now; try (final org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<>(producerConfig(broker))) { while (limit > (now = System.currentTimeMillis()) - start) { for (int i = 0; i < 1000; i++) { final String key = keys.next(); final String data = dataGen.generate(); producer.send(new ProducerRecord<>(topic, key, valueBuilder.apply(key, data))); threadTotal++; } if ((now - lastPrint) > printInterval) { System.out.println(Thread.currentThread().getName() + " produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + Duration.ofMillis(now - start)); lastPrint = now; } } } total.addAndGet(threadTotal); System.out.println(Thread.currentThread().getName() + " finished (" + numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start)); })); }{code} As you can see, this is a very basic usage. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13654) Extend KStream process with new Processor API
[ https://issues.apache.org/jira/browse/KAFKA-13654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-13654. -- Fix Version/s: 3.3.0 Resolution: Fixed > Extend KStream process with new Processor API > - > > Key: KAFKA-13654 > URL: https://issues.apache.org/jira/browse/KAFKA-13654 > Project: Kafka > Issue Type: Improvement >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: kafka-streams, kip-required, needs-kip, streams > Fix For: 3.3.0 > > > Extending KStream#process to use latest Processor API adopted here: > https://issues.apache.org/jira/browse/KAFKA-8410 > This new API allow typed returned KStream that will allow to chain results > from processors, becoming a new way to transform records with more control > over whats forwarded. > KIP: https://cwiki.apache.org/confluence/x/yKbkCw -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-13654) Extend KStream process with new Processor API
[ https://issues.apache.org/jira/browse/KAFKA-13654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13654: Assignee: Jorge Esteban Quilcate Otoya > Extend KStream process with new Processor API > - > > Key: KAFKA-13654 > URL: https://issues.apache.org/jira/browse/KAFKA-13654 > Project: Kafka > Issue Type: Improvement >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: kafka-streams, kip-required, needs-kip, streams > > Extending KStream#process to use latest Processor API adopted here: > https://issues.apache.org/jira/browse/KAFKA-8410 > This new API allow typed returned KStream that will allow to chain results > from processors, becoming a new way to transform records with more control > over whats forwarded. > KIP: https://cwiki.apache.org/confluence/x/yKbkCw -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13820) Add debug-level logs to explain why a store is filtered out during interactive query
John Roesler created KAFKA-13820: Summary: Add debug-level logs to explain why a store is filtered out during interactive query Key: KAFKA-13820 URL: https://issues.apache.org/jira/browse/KAFKA-13820 Project: Kafka Issue Type: Improvement Reporter: John Roesler Currently Kafka Streams throws an InvalidStateStoreException when the desired store is not present on the local instance. It also throws the same exception with the same message when the store is present, but it not active (and stale queries are disabled). This is an important distinction when debugging store unavailability, and a debug-level log is an un-intrusive mechanism to expose the information. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13819) Add application.server to Streams assignor logs when set
[ https://issues.apache.org/jira/browse/KAFKA-13819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13819: - Component/s: streams > Add application.server to Streams assignor logs when set > > > Key: KAFKA-13819 > URL: https://issues.apache.org/jira/browse/KAFKA-13819 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > > Currently, Streams assignment logs only include the consumer client id and > the streams application id, but those are both randomly generated UUIDs that > are not easy to coordinate to users' concept of the name of a host. To help > bridge this gap, we can include the application.server (when set) in > assignment logs. That way, users will also be able to see which host and port > each member is associated with. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13818) Add generation to consumer assignor logs
[ https://issues.apache.org/jira/browse/KAFKA-13818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13818: - Component/s: consumer > Add generation to consumer assignor logs > > > Key: KAFKA-13818 > URL: https://issues.apache.org/jira/browse/KAFKA-13818 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > > Reading assignor logs is really confusing in large part because they are > spread across different layers of abstraction (the ConsumerCoordinator > and the ConsumerPartitionAssignor, which in Streams consists of several > layers of its own). Each layer in the abstraction reports useful information > that only it has access to, but because they are split over multiple lines, > with > multiple members in the cluster, and (often) multiple rebalances taking place > in rapid succession, it's often hard to understand which logs are part of > which rebalance. > > One thing we don't want to do is break encapsulation by exposing too much of > the ConsumerCoordinator's internal state to components like the pluggable > ConsumerPartitionAssignor. > > We can accomplish what we want by adding the concept of a dynamic log > context, so that the ConsumerCoordinator can add dynamic information like the > generation id to be logged for correlation in other components without > exposing any new information or metadata to those components themselves. > See [https://github.com/apache/kafka/pull/12020] for example. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13820) Add debug-level logs to explain why a store is filtered out during interactive query
[ https://issues.apache.org/jira/browse/KAFKA-13820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13820: - Component/s: streams > Add debug-level logs to explain why a store is filtered out during > interactive query > > > Key: KAFKA-13820 > URL: https://issues.apache.org/jira/browse/KAFKA-13820 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > > Currently Kafka Streams throws an InvalidStateStoreException when the desired > store is not present on the local instance. It also throws the same exception > with the same message when the store is present, but it not active (and stale > queries are disabled). > This is an important distinction when debugging store unavailability, and a > debug-level log is an un-intrusive mechanism to expose the information. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13819) Add application.server to Streams assignor logs when set
John Roesler created KAFKA-13819: Summary: Add application.server to Streams assignor logs when set Key: KAFKA-13819 URL: https://issues.apache.org/jira/browse/KAFKA-13819 Project: Kafka Issue Type: Improvement Reporter: John Roesler Currently, Streams assignment logs only include the consumer client id and the streams application id, but those are both randomly generated UUIDs that are not easy to coordinate to users' concept of the name of a host. To help bridge this gap, we can include the application.server (when set) in assignment logs. That way, users will also be able to see which host and port each member is associated with. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13818) Add generation to consumer assignor logs
John Roesler created KAFKA-13818: Summary: Add generation to consumer assignor logs Key: KAFKA-13818 URL: https://issues.apache.org/jira/browse/KAFKA-13818 Project: Kafka Issue Type: Improvement Reporter: John Roesler Reading assignor logs is really confusing in large part because they are spread across different layers of abstraction (the ConsumerCoordinator and the ConsumerPartitionAssignor, which in Streams consists of several layers of its own). Each layer in the abstraction reports useful information that only it has access to, but because they are split over multiple lines, with multiple members in the cluster, and (often) multiple rebalances taking place in rapid succession, it's often hard to understand which logs are part of which rebalance. One thing we don't want to do is break encapsulation by exposing too much of the ConsumerCoordinator's internal state to components like the pluggable ConsumerPartitionAssignor. We can accomplish what we want by adding the concept of a dynamic log context, so that the ConsumerCoordinator can add dynamic information like the generation id to be logged for correlation in other components without exposing any new information or metadata to those components themselves. See [https://github.com/apache/kafka/pull/12020] for example. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13818) Add generation to consumer assignor logs
[ https://issues.apache.org/jira/browse/KAFKA-13818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13818: Assignee: John Roesler > Add generation to consumer assignor logs > > > Key: KAFKA-13818 > URL: https://issues.apache.org/jira/browse/KAFKA-13818 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > > Reading assignor logs is really confusing in large part because they are > spread across different layers of abstraction (the ConsumerCoordinator > and the ConsumerPartitionAssignor, which in Streams consists of several > layers of its own). Each layer in the abstraction reports useful information > that only it has access to, but because they are split over multiple lines, > with > multiple members in the cluster, and (often) multiple rebalances taking place > in rapid succession, it's often hard to understand which logs are part of > which rebalance. > > One thing we don't want to do is break encapsulation by exposing too much of > the ConsumerCoordinator's internal state to components like the pluggable > ConsumerPartitionAssignor. > > We can accomplish what we want by adding the concept of a dynamic log > context, so that the ConsumerCoordinator can add dynamic information like the > generation id to be logged for correlation in other components without > exposing any new information or metadata to those components themselves. > See [https://github.com/apache/kafka/pull/12020] for example. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13479) Interactive Query v2
[ https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-13479. -- Fix Version/s: 3.2.0 Resolution: Fixed > Interactive Query v2 > > > Key: KAFKA-13479 > URL: https://issues.apache.org/jira/browse/KAFKA-13479 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 3.2.0 > > > Kafka Streams supports an interesting and innovative API for "peeking" into > the internal state of running stateful stream processors from outside of the > application, called Interactive Query (IQ). This functionality has proven > invaluable to users over the years for everything from debugging running > applications to serving low latency queries straight from the Streams runtime. > However, the actual interfaces for IQ were designed in the very early days of > Kafka Streams, before the project had gained significant adoption, and in the > absence of much precedent for this kind of API in peer projects. With the > benefit of hindsight, we can observe several problems with the original > design that we hope to address in a revised framework that will serve Streams > users well for many years to come. > > This ticket tracks the implementation of KIP-796: > https://cwiki.apache.org/confluence/x/34xnCw -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (KAFKA-13479) Interactive Query v2
[ https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler closed KAFKA-13479. > Interactive Query v2 > > > Key: KAFKA-13479 > URL: https://issues.apache.org/jira/browse/KAFKA-13479 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 3.2.0 > > > Kafka Streams supports an interesting and innovative API for "peeking" into > the internal state of running stateful stream processors from outside of the > application, called Interactive Query (IQ). This functionality has proven > invaluable to users over the years for everything from debugging running > applications to serving low latency queries straight from the Streams runtime. > However, the actual interfaces for IQ were designed in the very early days of > Kafka Streams, before the project had gained significant adoption, and in the > absence of much precedent for this kind of API in peer projects. With the > benefit of hindsight, we can observe several problems with the original > design that we hope to address in a revised framework that will serve Streams > users well for many years to come. > > This ticket tracks the implementation of KIP-796: > https://cwiki.apache.org/confluence/x/34xnCw -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores
[ https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13622: - Parent: (was: KAFKA-13479) Issue Type: Improvement (was: Sub-task) > Revisit the complexity of position tracking in state stores > --- > > Key: KAFKA-13622 > URL: https://issues.apache.org/jira/browse/KAFKA-13622 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > Currently, state store implementers have a significant burden to track > position correctly. They have to: > * update the position during all puts > * implement the RecordBatchingStateRestoreCallback and use the > {color:#00}ChangelogRecordDeserializationHelper to update the position > based on record headers{color} > * {color:#00}implement some mechanism to restore the position after a > restart if the store is persistent (such as supply a CommitCallback to write > the position to a local file and then read the file during init){color} > {color:#00}[~guozhang] pointed out during review that this is probably > too much responsibility (and certainly too much opportunity for error). We > should see what we can do to simplify these responsibilities, if not > eliminate them entirely from the store implementer's scope of concern. > {color} > > {color:#00}See > https://github.com/apache/kafka/pull/11676#discussion_r790358058{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13548: - Parent: (was: KAFKA-13479) Issue Type: Improvement (was: Sub-task) > IQv2: revisit WindowKeyQuery and WindowRangeQuery > - > > Key: KAFKA-13548 > URL: https://issues.apache.org/jira/browse/KAFKA-13548 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > During discussion of KIP-806, there was a suggestion to refactor the queries > following a builder pattern so that we can compactly and flexibly specify > lower and upper bounds on the keys, window start times, and window end times. > We should circle back and try to generalize the queries' interfaces before > the first release of IQv2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores
[ https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13622: - Labels: IQv2 (was: ) > Revisit the complexity of position tracking in state stores > --- > > Key: KAFKA-13622 > URL: https://issues.apache.org/jira/browse/KAFKA-13622 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > Currently, state store implementers have a significant burden to track > position correctly. They have to: > * update the position during all puts > * implement the RecordBatchingStateRestoreCallback and use the > {color:#00}ChangelogRecordDeserializationHelper to update the position > based on record headers{color} > * {color:#00}implement some mechanism to restore the position after a > restart if the store is persistent (such as supply a CommitCallback to write > the position to a local file and then read the file during init){color} > {color:#00}[~guozhang] pointed out during review that this is probably > too much responsibility (and certainly too much opportunity for error). We > should see what we can do to simplify these responsibilities, if not > eliminate them entirely from the store implementer's scope of concern. > {color} > > {color:#00}See > https://github.com/apache/kafka/pull/11676#discussion_r790358058{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores
[ https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13622: - Component/s: streams > Revisit the complexity of position tracking in state stores > --- > > Key: KAFKA-13622 > URL: https://issues.apache.org/jira/browse/KAFKA-13622 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > Currently, state store implementers have a significant burden to track > position correctly. They have to: > * update the position during all puts > * implement the RecordBatchingStateRestoreCallback and use the > {color:#00}ChangelogRecordDeserializationHelper to update the position > based on record headers{color} > * {color:#00}implement some mechanism to restore the position after a > restart if the store is persistent (such as supply a CommitCallback to write > the position to a local file and then read the file during init){color} > {color:#00}[~guozhang] pointed out during review that this is probably > too much responsibility (and certainly too much opportunity for error). We > should see what we can do to simplify these responsibilities, if not > eliminate them entirely from the store implementer's scope of concern. > {color} > > {color:#00}See > https://github.com/apache/kafka/pull/11676#discussion_r790358058{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13548: - Component/s: streams > IQv2: revisit WindowKeyQuery and WindowRangeQuery > - > > Key: KAFKA-13548 > URL: https://issues.apache.org/jira/browse/KAFKA-13548 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > During discussion of KIP-806, there was a suggestion to refactor the queries > following a builder pattern so that we can compactly and flexibly specify > lower and upper bounds on the keys, window start times, and window end times. > We should circle back and try to generalize the queries' interfaces before > the first release of IQv2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe
[ https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13541: - Parent: (was: KAFKA-13479) Issue Type: Improvement (was: Sub-task) > Make IQv2 query/store interface type safe > - > > Key: KAFKA-13541 > URL: https://issues.apache.org/jira/browse/KAFKA-13541 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Minor > Labels: IQv2 > > Currently the new IQv2 interface allows applications to query state stores > using subclasses of the Query type. Unfortunately there is currently no > way to check that the template type of the query matches the type of the > relevant store the query is executed on. As a consequence stores have to do a > set of unsafe casts. > This ticket is to explore ways to make the query interface type safe where > only type mismatches are detected at compile time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13548: - Labels: IQv2 (was: ) > IQv2: revisit WindowKeyQuery and WindowRangeQuery > - > > Key: KAFKA-13548 > URL: https://issues.apache.org/jira/browse/KAFKA-13548 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > During discussion of KIP-806, there was a suggestion to refactor the queries > following a builder pattern so that we can compactly and flexibly specify > lower and upper bounds on the keys, window start times, and window end times. > We should circle back and try to generalize the queries' interfaces before > the first release of IQv2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13479) Interactive Query v2
[ https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17518567#comment-17518567 ] John Roesler commented on KAFKA-13479: -- Sorry about that, [~cadonna] . I'll do it now. All the follow-on IQv2 work will be available under the label `IQv2`: https://issues.apache.org/jira/issues/?jql=labels%20%3D%20IQv2 > Interactive Query v2 > > > Key: KAFKA-13479 > URL: https://issues.apache.org/jira/browse/KAFKA-13479 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Kafka Streams supports an interesting and innovative API for "peeking" into > the internal state of running stateful stream processors from outside of the > application, called Interactive Query (IQ). This functionality has proven > invaluable to users over the years for everything from debugging running > applications to serving low latency queries straight from the Streams runtime. > However, the actual interfaces for IQ were designed in the very early days of > Kafka Streams, before the project had gained significant adoption, and in the > absence of much precedent for this kind of API in peer projects. With the > benefit of hindsight, we can observe several problems with the original > design that we hope to address in a revised framework that will serve Streams > users well for many years to come. > > This ticket tracks the implementation of KIP-796: > https://cwiki.apache.org/confluence/x/34xnCw -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe
[ https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13541: - Component/s: streams > Make IQv2 query/store interface type safe > - > > Key: KAFKA-13541 > URL: https://issues.apache.org/jira/browse/KAFKA-13541 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Minor > Labels: IQv2 > > Currently the new IQv2 interface allows applications to query state stores > using subclasses of the Query type. Unfortunately there is currently no > way to check that the template type of the query matches the type of the > relevant store the query is executed on. As a consequence stores have to do a > set of unsafe casts. > This ticket is to explore ways to make the query interface type safe where > only type mismatches are detected at compile time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe
[ https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13541: - Labels: IQv2 (was: ) > Make IQv2 query/store interface type safe > - > > Key: KAFKA-13541 > URL: https://issues.apache.org/jira/browse/KAFKA-13541 > Project: Kafka > Issue Type: Sub-task >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Minor > Labels: IQv2 > > Currently the new IQv2 interface allows applications to query state stores > using subclasses of the Query type. Unfortunately there is currently no > way to check that the template type of the query matches the type of the > relevant store the query is executed on. As a consequence stores have to do a > set of unsafe casts. > This ticket is to explore ways to make the query interface type safe where > only type mismatches are detected at compile time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries
[ https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13526: - Labels: IQv2 (was: ) > IQv2: Consider more generic logic for mapping between binary and typed queries > -- > > Key: KAFKA-13526 > URL: https://issues.apache.org/jira/browse/KAFKA-13526 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > Right now, typed queries (like KeyQuery) need to be specially handled and > translated to their binary counterparts (like RawKeyQuery). This happens in > the Metered store layers, where the serdes are known. It is necessary because > lower store layers are only able to handle binary data (because they don't > know the serdes). > This situation is not ideal, since the Metered store layers will grow to host > quite a bit of query handling and translation logic, because the relationship > between typed queries and binary counterparts is not obvious, and because we > can only automatically translate known query types. User-supplied queries and > stores will have to work things out using their a-priori knowledge of the > serdes. > > One suggestion (from [~mjsax] ) is to come up with some kind of generic > "query mapping" API, which the Metered stores would use to translate back and > forth between typed and raw keys and values. Users would be able to supply > their own mappings along with their custom queries. > Another option would be to have the Metered stores attach the serdes to the > query on the way down and then to the result on the way up. Then, the serdes > would be available in the bytes store (as part of the request) and to the > users when they get their results (as part of the response). > Other options may also surface once we start playing with ideas. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries
[ https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13526: - Parent: (was: KAFKA-13479) Issue Type: Improvement (was: Sub-task) > IQv2: Consider more generic logic for mapping between binary and typed queries > -- > > Key: KAFKA-13526 > URL: https://issues.apache.org/jira/browse/KAFKA-13526 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > Right now, typed queries (like KeyQuery) need to be specially handled and > translated to their binary counterparts (like RawKeyQuery). This happens in > the Metered store layers, where the serdes are known. It is necessary because > lower store layers are only able to handle binary data (because they don't > know the serdes). > This situation is not ideal, since the Metered store layers will grow to host > quite a bit of query handling and translation logic, because the relationship > between typed queries and binary counterparts is not obvious, and because we > can only automatically translate known query types. User-supplied queries and > stores will have to work things out using their a-priori knowledge of the > serdes. > > One suggestion (from [~mjsax] ) is to come up with some kind of generic > "query mapping" API, which the Metered stores would use to translate back and > forth between typed and raw keys and values. Users would be able to supply > their own mappings along with their custom queries. > Another option would be to have the Metered stores attach the serdes to the > query on the way down and then to the result on the way up. Then, the serdes > would be available in the bytes store (as part of the request) and to the > users when they get their results (as part of the response). > Other options may also surface once we start playing with ideas. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries
[ https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13526: - Component/s: streams > IQv2: Consider more generic logic for mapping between binary and typed queries > -- > > Key: KAFKA-13526 > URL: https://issues.apache.org/jira/browse/KAFKA-13526 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: IQv2 > > Right now, typed queries (like KeyQuery) need to be specially handled and > translated to their binary counterparts (like RawKeyQuery). This happens in > the Metered store layers, where the serdes are known. It is necessary because > lower store layers are only able to handle binary data (because they don't > know the serdes). > This situation is not ideal, since the Metered store layers will grow to host > quite a bit of query handling and translation logic, because the relationship > between typed queries and binary counterparts is not obvious, and because we > can only automatically translate known query types. User-supplied queries and > stores will have to work things out using their a-priori knowledge of the > serdes. > > One suggestion (from [~mjsax] ) is to come up with some kind of generic > "query mapping" API, which the Metered stores would use to translate back and > forth between typed and raw keys and values. Users would be able to supply > their own mappings along with their custom queries. > Another option would be to have the Metered stores attach the serdes to the > query on the way down and then to the result on the way up. Then, the serdes > would be available in the bytes store (as part of the request) and to the > users when they get their results (as part of the response). > Other options may also surface once we start playing with ideas. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores
[ https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13523: - Priority: Major (was: Blocker) > Implement IQv2 support in global stores > --- > > Key: KAFKA-13523 > URL: https://issues.apache.org/jira/browse/KAFKA-13523 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Major > Labels: IQv2 > > Global stores pose one significant problem for IQv2: when they start up, they > skip the regular ingest pipeline and instead use the "restoration" pipeline > to read up until the current end offset. Then, they switch over to the > regular ingest pipeline. > IQv2 position tracking expects to track the position of each record from the > input topic through the ingest pipeline and then get the position headers > through the restoration pipeline via the changelog topic. The fact that > global stores "restore" the input topic instead of ingesting it violates our > expectations. > It has also caused other problems, so we may want to consider switching the > global store processing to use the normal paradigm rather than adding > special-case logic to track positions in global stores. > > Note: there are two primary reasons that global stores behave this way: > # We can write in batches during restoration, so the I/O may be more > efficient > # The global thread does not transition to RUNNING state until it reaches > the (current) end of the input topic, which blocks other threads from joining > against it, thereby improving the time synchronization of global KTable joins. > If we want to propose changing the bootstrapping pipeline for global threads, > we should have some kind of answer to these concerns. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores
[ https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13523: - Affects Version/s: (was: 3.2.0) > Implement IQv2 support in global stores > --- > > Key: KAFKA-13523 > URL: https://issues.apache.org/jira/browse/KAFKA-13523 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: IQv2 > > Global stores pose one significant problem for IQv2: when they start up, they > skip the regular ingest pipeline and instead use the "restoration" pipeline > to read up until the current end offset. Then, they switch over to the > regular ingest pipeline. > IQv2 position tracking expects to track the position of each record from the > input topic through the ingest pipeline and then get the position headers > through the restoration pipeline via the changelog topic. The fact that > global stores "restore" the input topic instead of ingesting it violates our > expectations. > It has also caused other problems, so we may want to consider switching the > global store processing to use the normal paradigm rather than adding > special-case logic to track positions in global stores. > > Note: there are two primary reasons that global stores behave this way: > # We can write in batches during restoration, so the I/O may be more > efficient > # The global thread does not transition to RUNNING state until it reaches > the (current) end of the input topic, which blocks other threads from joining > against it, thereby improving the time synchronization of global KTable joins. > If we want to propose changing the bootstrapping pipeline for global threads, > we should have some kind of answer to these concerns. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores
[ https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13523: - Component/s: streams > Implement IQv2 support in global stores > --- > > Key: KAFKA-13523 > URL: https://issues.apache.org/jira/browse/KAFKA-13523 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > Labels: IQv2 > > Global stores pose one significant problem for IQv2: when they start up, they > skip the regular ingest pipeline and instead use the "restoration" pipeline > to read up until the current end offset. Then, they switch over to the > regular ingest pipeline. > IQv2 position tracking expects to track the position of each record from the > input topic through the ingest pipeline and then get the position headers > through the restoration pipeline via the changelog topic. The fact that > global stores "restore" the input topic instead of ingesting it violates our > expectations. > It has also caused other problems, so we may want to consider switching the > global store processing to use the normal paradigm rather than adding > special-case logic to track positions in global stores. > > Note: there are two primary reasons that global stores behave this way: > # We can write in batches during restoration, so the I/O may be more > efficient > # The global thread does not transition to RUNNING state until it reaches > the (current) end of the input topic, which blocks other threads from joining > against it, thereby improving the time synchronization of global KTable joins. > If we want to propose changing the bootstrapping pipeline for global threads, > we should have some kind of answer to these concerns. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores
[ https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13523: - Parent: (was: KAFKA-13479) Issue Type: Improvement (was: Sub-task) > Implement IQv2 support in global stores > --- > > Key: KAFKA-13523 > URL: https://issues.apache.org/jira/browse/KAFKA-13523 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > Labels: IQv2 > > Global stores pose one significant problem for IQv2: when they start up, they > skip the regular ingest pipeline and instead use the "restoration" pipeline > to read up until the current end offset. Then, they switch over to the > regular ingest pipeline. > IQv2 position tracking expects to track the position of each record from the > input topic through the ingest pipeline and then get the position headers > through the restoration pipeline via the changelog topic. The fact that > global stores "restore" the input topic instead of ingesting it violates our > expectations. > It has also caused other problems, so we may want to consider switching the > global store processing to use the normal paradigm rather than adding > special-case logic to track positions in global stores. > > Note: there are two primary reasons that global stores behave this way: > # We can write in batches during restoration, so the I/O may be more > efficient > # The global thread does not transition to RUNNING state until it reaches > the (current) end of the input topic, which blocks other threads from joining > against it, thereby improving the time synchronization of global KTable joins. > If we want to propose changing the bootstrapping pipeline for global threads, > we should have some kind of answer to these concerns. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores
[ https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13523: - Labels: IQv2 (was: ) > Implement IQv2 support in global stores > --- > > Key: KAFKA-13523 > URL: https://issues.apache.org/jira/browse/KAFKA-13523 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > Labels: IQv2 > > Global stores pose one significant problem for IQv2: when they start up, they > skip the regular ingest pipeline and instead use the "restoration" pipeline > to read up until the current end offset. Then, they switch over to the > regular ingest pipeline. > IQv2 position tracking expects to track the position of each record from the > input topic through the ingest pipeline and then get the position headers > through the restoration pipeline via the changelog topic. The fact that > global stores "restore" the input topic instead of ingesting it violates our > expectations. > It has also caused other problems, so we may want to consider switching the > global store processing to use the normal paradigm rather than adding > special-case logic to track positions in global stores. > > Note: there are two primary reasons that global stores behave this way: > # We can write in batches during restoration, so the I/O may be more > efficient > # The global thread does not transition to RUNNING state until it reaches > the (current) end of the input topic, which blocks other threads from joining > against it, thereby improving the time synchronization of global KTable joins. > If we want to propose changing the bootstrapping pipeline for global threads, > we should have some kind of answer to these concerns. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-13554. -- Resolution: Won't Fix > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Closed] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler closed KAFKA-13554. > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17512096#comment-17512096 ] John Roesler commented on KAFKA-13554: -- I thought that I filed this because it had been insisted upon as a follow-on in the discussion thread or the PR, but I don't see any evidence of that, so I think we're better off leaving it alone. [https://lists.apache.org/thread/4clhz43yy9nk6kkggbcn0y3v61b05sp1] [https://github.com/apache/kafka/pull/11598] > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13554: - Affects Version/s: (was: 3.2.0) > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13554: - Priority: Minor (was: Blocker) > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores
[ https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13622: - Priority: Minor (was: Blocker) > Revisit the complexity of position tracking in state stores > --- > > Key: KAFKA-13622 > URL: https://issues.apache.org/jira/browse/KAFKA-13622 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Minor > > Currently, state store implementers have a significant burden to track > position correctly. They have to: > * update the position during all puts > * implement the RecordBatchingStateRestoreCallback and use the > {color:#00}ChangelogRecordDeserializationHelper to update the position > based on record headers{color} > * {color:#00}implement some mechanism to restore the position after a > restart if the store is persistent (such as supply a CommitCallback to write > the position to a local file and then read the file during init){color} > {color:#00}[~guozhang] pointed out during review that this is probably > too much responsibility (and certainly too much opportunity for error). We > should see what we can do to simplify these responsibilities, if not > eliminate them entirely from the store implementer's scope of concern. > {color} > > {color:#00}See > https://github.com/apache/kafka/pull/11676#discussion_r790358058{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores
[ https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13622: - Affects Version/s: (was: 3.2.0) > Revisit the complexity of position tracking in state stores > --- > > Key: KAFKA-13622 > URL: https://issues.apache.org/jira/browse/KAFKA-13622 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Blocker > > Currently, state store implementers have a significant burden to track > position correctly. They have to: > * update the position during all puts > * implement the RecordBatchingStateRestoreCallback and use the > {color:#00}ChangelogRecordDeserializationHelper to update the position > based on record headers{color} > * {color:#00}implement some mechanism to restore the position after a > restart if the store is persistent (such as supply a CommitCallback to write > the position to a local file and then read the file during init){color} > {color:#00}[~guozhang] pointed out during review that this is probably > too much responsibility (and certainly too much opportunity for error). We > should see what we can do to simplify these responsibilities, if not > eliminate them entirely from the store implementer's scope of concern. > {color} > > {color:#00}See > https://github.com/apache/kafka/pull/11676#discussion_r790358058{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe
[ https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13541: - Affects Version/s: (was: 3.2.0) > Make IQv2 query/store interface type safe > - > > Key: KAFKA-13541 > URL: https://issues.apache.org/jira/browse/KAFKA-13541 > Project: Kafka > Issue Type: Sub-task >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Minor > > Currently the new IQv2 interface allows applications to query state stores > using subclasses of the Query type. Unfortunately there is currently no > way to check that the template type of the query matches the type of the > relevant store the query is executed on. As a consequence stores have to do a > set of unsafe casts. > This ticket is to explore ways to make the query interface type safe where > only type mismatches are detected at compile time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13554: Assignee: John Roesler (was: Vicky Papavasileiou) > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe
[ https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13541: - Priority: Minor (was: Blocker) > Make IQv2 query/store interface type safe > - > > Key: KAFKA-13541 > URL: https://issues.apache.org/jira/browse/KAFKA-13541 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Minor > > Currently the new IQv2 interface allows applications to query state stores > using subclasses of the Query type. Unfortunately there is currently no > way to check that the template type of the query matches the type of the > relevant store the query is executed on. As a consequence stores have to do a > set of unsafe casts. > This ticket is to explore ways to make the query interface type safe where > only type mismatches are detected at compile time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries
[ https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13526: - Priority: Minor (was: Blocker) > IQv2: Consider more generic logic for mapping between binary and typed queries > -- > > Key: KAFKA-13526 > URL: https://issues.apache.org/jira/browse/KAFKA-13526 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Minor > > Right now, typed queries (like KeyQuery) need to be specially handled and > translated to their binary counterparts (like RawKeyQuery). This happens in > the Metered store layers, where the serdes are known. It is necessary because > lower store layers are only able to handle binary data (because they don't > know the serdes). > This situation is not ideal, since the Metered store layers will grow to host > quite a bit of query handling and translation logic, because the relationship > between typed queries and binary counterparts is not obvious, and because we > can only automatically translate known query types. User-supplied queries and > stores will have to work things out using their a-priori knowledge of the > serdes. > > One suggestion (from [~mjsax] ) is to come up with some kind of generic > "query mapping" API, which the Metered stores would use to translate back and > forth between typed and raw keys and values. Users would be able to supply > their own mappings along with their custom queries. > Another option would be to have the Metered stores attach the serdes to the > query on the way down and then to the result on the way up. Then, the serdes > would be available in the bytes store (as part of the request) and to the > users when they get their results (as part of the response). > Other options may also surface once we start playing with ideas. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries
[ https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13526: - Affects Version/s: (was: 3.2.0) > IQv2: Consider more generic logic for mapping between binary and typed queries > -- > > Key: KAFKA-13526 > URL: https://issues.apache.org/jira/browse/KAFKA-13526 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Minor > > Right now, typed queries (like KeyQuery) need to be specially handled and > translated to their binary counterparts (like RawKeyQuery). This happens in > the Metered store layers, where the serdes are known. It is necessary because > lower store layers are only able to handle binary data (because they don't > know the serdes). > This situation is not ideal, since the Metered store layers will grow to host > quite a bit of query handling and translation logic, because the relationship > between typed queries and binary counterparts is not obvious, and because we > can only automatically translate known query types. User-supplied queries and > stores will have to work things out using their a-priori knowledge of the > serdes. > > One suggestion (from [~mjsax] ) is to come up with some kind of generic > "query mapping" API, which the Metered stores would use to translate back and > forth between typed and raw keys and values. Users would be able to supply > their own mappings along with their custom queries. > Another option would be to have the Metered stores attach the serdes to the > query on the way down and then to the result on the way up. Then, the serdes > would be available in the bytes store (as part of the request) and to the > users when they get their results (as part of the response). > Other options may also surface once we start playing with ideas. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13548: - Priority: Minor (was: Blocker) > IQv2: revisit WindowKeyQuery and WindowRangeQuery > - > > Key: KAFKA-13548 > URL: https://issues.apache.org/jira/browse/KAFKA-13548 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Minor > > During discussion of KIP-806, there was a suggestion to refactor the queries > following a builder pattern so that we can compactly and flexibly specify > lower and upper bounds on the keys, window start times, and window end times. > We should circle back and try to generalize the queries' interfaces before > the first release of IQv2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13548: - Affects Version/s: (was: 3.2.0) > IQv2: revisit WindowKeyQuery and WindowRangeQuery > - > > Key: KAFKA-13548 > URL: https://issues.apache.org/jira/browse/KAFKA-13548 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Blocker > > During discussion of KIP-806, there was a suggestion to refactor the queries > following a builder pattern so that we can compactly and flexibly specify > lower and upper bounds on the keys, window start times, and window end times. > We should circle back and try to generalize the queries' interfaces before > the first release of IQv2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13479) Interactive Query v2
[ https://issues.apache.org/jira/browse/KAFKA-13479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13479: - Fix Version/s: (was: 3.2.0) > Interactive Query v2 > > > Key: KAFKA-13479 > URL: https://issues.apache.org/jira/browse/KAFKA-13479 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Kafka Streams supports an interesting and innovative API for "peeking" into > the internal state of running stateful stream processors from outside of the > application, called Interactive Query (IQ). This functionality has proven > invaluable to users over the years for everything from debugging running > applications to serving low latency queries straight from the Streams runtime. > However, the actual interfaces for IQ were designed in the very early days of > Kafka Streams, before the project had gained significant adoption, and in the > absence of much precedent for this kind of API in peer projects. With the > benefit of hindsight, we can observe several problems with the original > design that we hope to address in a revised framework that will serve Streams > users well for many years to come. > > This ticket tracks the implementation of KIP-796: > https://cwiki.apache.org/confluence/x/34xnCw -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-13714. -- Fix Version/s: 3.2.0 Assignee: John Roesler Resolution: Fixed > Flaky test IQv2StoreIntegrationTest > --- > > Key: KAFKA-13714 > URL: https://issues.apache.org/jira/browse/KAFKA-13714 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 3.2.0 > > > I have observed multiple consistency violations in the > IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's > apparently a major flaw in the feature, we should not release with this bug > outstanding. Depending on the time-table, we may want to block the release or > pull the feature until the next release. > > The first observation I have is from 23 Feb 2022. So far all observations > point to the range query in particular, and all observations have been for > RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the > windowed store built on RocksDB segments. > For reference, range queries were implemented on 16 Feb 2022: > [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884] > The window-specific range query test has also failed once that I have seen. > That feature was implemented on 2 Jan 2022: > [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c] > > Here are some stack traces I have seen: > {code:java} > verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 2]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) > {code} > {code:java} > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 3]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778) >{code} > {code:java} > verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a, > executionInfo=[], position=Position{position={input-topic={0=1, > 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364, > executionInfo=[], position=Position{position={input-topic={1=1}, > globalResult=null} > Expected: is <[1, 2, 3]> > but: was <[1, 2]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780) > {code} > {code:java} > verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] > java.lang.AssertionError: > Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6, > executionInfo=[], position=Position{position={input-topic={0=1, > 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165, > executionInfo=[], position=Position{position={input-topic={1=1}, > globalResult=null} > Expected: is <[0, 1, 2, 3]> > but: was <[0, 2, 3]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) >
[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509543#comment-17509543 ] John Roesler commented on KAFKA-13714: -- Oh, before I forget, here's how I'm getting repros: {code:java} I=0; while ./gradlew :streams:test -Prerun-tests --tests org.apache.kafka.streams.integration.IQv2StoreIntegrationTest --fail-fast; do (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done{code} It usually takes about 500 iterations before it shows up. This depends on a gradle change that I'm planning to commit to support this use case. > Flaky test IQv2StoreIntegrationTest > --- > > Key: KAFKA-13714 > URL: https://issues.apache.org/jira/browse/KAFKA-13714 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > > I have observed multiple consistency violations in the > IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's > apparently a major flaw in the feature, we should not release with this bug > outstanding. Depending on the time-table, we may want to block the release or > pull the feature until the next release. > > The first observation I have is from 23 Feb 2022. So far all observations > point to the range query in particular, and all observations have been for > RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the > windowed store built on RocksDB segments. > For reference, range queries were implemented on 16 Feb 2022: > [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884] > The window-specific range query test has also failed once that I have seen. > That feature was implemented on 2 Jan 2022: > [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c] > > Here are some stack traces I have seen: > {code:java} > verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 2]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) > {code} > {code:java} > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 3]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778) >{code} > {code:java} > verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a, > executionInfo=[], position=Position{position={input-topic={0=1, > 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364, > executionInfo=[], position=Position{position={input-topic={1=1}, > globalResult=null} > Expected: is <[1, 2, 3]> > but: was <[1, 2]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780) > {code} > {code:java} > verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] > java.lang.AssertionError: > Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6, > executionInfo=[], position=Position{position={input-topic={0=1, >
[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509541#comment-17509541 ] John Roesler commented on KAFKA-13714: -- Added some more logs, and I think I'm onto something {code:java} verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] java.lang.AssertionError: Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@1f193686, executionInfo=[Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 2302842ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 3051842ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 3074902ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@58294867 in 3956557ns], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@31e72cbc, executionInfo=[Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 415148ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 1033935ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1053899ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@67c277a0 in 1106865ns], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1140) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:818) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:782) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at
[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509379#comment-17509379 ] John Roesler commented on KAFKA-13714: -- Another: {code:java} verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] java.lang.AssertionError: Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@72e789cb, executionInfo=[Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 87900ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 366097ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 373038ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@627d8516 in 400408ns], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@7c1812b3, executionInfo=[Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 27551ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 406916ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 413227ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@5c10285a in 427044ns], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 3]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1140) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:818) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:782) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at
[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509366#comment-17509366 ] John Roesler commented on KAFKA-13714: -- Continuing debugging: h3. verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] {code:java} java.lang.AssertionError: Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@1f193686, executionInfo=[Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 2335793ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 3045186ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 3068465ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@58294867 in 3974765ns], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@31e72cbc, executionInfo=[Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 112183ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 775416ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 796244ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@67c277a0 in 849835ns], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1140) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:818) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:782) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at
[jira] [Comment Edited] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508386#comment-17508386 ] John Roesler edited comment on KAFKA-13714 at 3/17/22, 7:32 PM: Another local repro: {code:java} org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED java.lang.AssertionError: Result:StateQueryResult{partitionResults={ 0=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 1165952ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns ], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 116767ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns ], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) {code} logs: {code:java} [2022-03-17 07:31:56,286] INFO stream-client [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138] Kafka Streams version: test-version (org.apache.kafka.streams.KafkaStreams:912) [2022-03-17 07:31:56,286] INFO stream-client [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138] Kafka Streams commit ID: test-commit-ID (org.apache.kafka.streams.KafkaStreams:913) [2022-03-17 07:31:56,288] INFO stream-thread [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:346) [2022-03-17 07:31:56,295] INFO stream-thread [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1] Creating thread producer client (org.apache.kafka.streams.processor.internals.StreamThread:105) [2022-03-17 07:31:56,297] INFO [Producer clientId=app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-producer] Instantiated an idempotent producer. (org.apache.kafka.clients.producer.KafkaProducer:532) [2022-03-17 07:31:56,304] INFO stream-thread [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:397) [2022-03-17 07:31:56,308] INFO stream-thread [app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-consumer] Cooperative rebalancing protocol is enabled now (org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration:126) [2022-03-17 07:31:56,308] INFO [Producer clientId=app-org.apache.kafka.streams.integration.IQv2StoreIntegrationTest-true-true-TIME_ROCKS_KV-PAPI-1159953973-83d014b6-61fa-4ba5-a7b3-f035b42c2138-StreamThread-1-producer] Cluster ID: iZBZzURBQr6rMZEB6oxg7g (org.apache.kafka.clients.Metadata:287) [2022-03-17 07:31:56,309] INFO
[jira] [Commented] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508386#comment-17508386 ] John Roesler commented on KAFKA-13714: -- Another local repro: {code:java} org.apache.kafka.streams.integration.IQv2StoreIntegrationTest > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] FAILED java.lang.AssertionError: Result:StateQueryResult{partitionResults={ 0=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@3f702946, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 1153925ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 1165952ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 1181616ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@278667fd in 1260365ns ], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{ result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@42b6d0cc, executionInfo=[ Handled in class org.apache.kafka.streams.state.internals.RocksDBTimestampedStore in 109311ns, Handled in class org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore via WrappedStateStore in 116767ns, Handled in class org.apache.kafka.streams.state.internals.CachingKeyValueStore in 128961ns, Handled in class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore with serdes org.apache.kafka.streams.state.StateSerdes@684b31de in 185521ns ], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) {code} > Flaky test IQv2StoreIntegrationTest > --- > > Key: KAFKA-13714 > URL: https://issues.apache.org/jira/browse/KAFKA-13714 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > > I have observed multiple consistency violations in the > IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's > apparently a major flaw in the feature, we should not release with this bug > outstanding. Depending on the time-table, we may want to block the release or > pull the feature until the next release. > > The first observation I have is from 23 Feb 2022. So far all observations > point to the range query in particular, and all observations have been for > RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the > windowed store built on RocksDB segments. > For reference, range queries were implemented on 16 Feb 2022: > [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884] > The window-specific range query test has also failed once that I have seen. > That feature was implemented on 2 Jan 2022: > [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c] > > Here are some stack traces I have seen: > {code:java} > verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 2]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803) > at > org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) > {code} > {code:java} > verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] > java.lang.AssertionError: > Expected: is <[1, 2, 3]> > but: was <[1, 3]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at >
[jira] [Created] (KAFKA-13714) Flaky test IQv2StoreIntegrationTest
John Roesler created KAFKA-13714: Summary: Flaky test IQv2StoreIntegrationTest Key: KAFKA-13714 URL: https://issues.apache.org/jira/browse/KAFKA-13714 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.2.0 Reporter: John Roesler I have observed multiple consistency violations in the IQv2StoreIntegrationTest. Since this is the first release of IQv2, and it's apparently a major flaw in the feature, we should not release with this bug outstanding. Depending on the time-table, we may want to block the release or pull the feature until the next release. The first observation I have is from 23 Feb 2022. So far all observations point to the range query in particular, and all observations have been for RocksDB stores, including RocksDBStore, TimestampedRocksDBStore, and the windowed store built on RocksDB segments. For reference, range queries were implemented on 16 Feb 2022: [https://github.com/apache/kafka/commit/b38f6ba5cc989702180f5d5f8e55ba20444ea884] The window-specific range query test has also failed once that I have seen. That feature was implemented on 2 Jan 2022: [https://github.com/apache/kafka/commit/b8f1cf14c396ab04b8968a8fa04d8cf67dd3254c] Here are some stack traces I have seen: {code:java} verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] java.lang.AssertionError: Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1125) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:803) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:776) {code} {code:java} verifyStore[cache=true, log=true, supplier=TIME_ROCKS_KV, kind=PAPI] java.lang.AssertionError: Expected: is <[1, 2, 3]> but: was <[1, 3]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1131) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:809) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:778) {code} {code:java} verifyStore[cache=true, log=true, supplier=ROCKS_KV, kind=PAPI] java.lang.AssertionError: Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@35025a0a, executionInfo=[], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@38732364, executionInfo=[], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[1, 2, 3]> but: was <[1, 2]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQuery(IQv2StoreIntegrationTest.java:1129) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleRangeQueries(IQv2StoreIntegrationTest.java:807) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:780) {code} {code:java} verifyStore[cache=true, log=false, supplier=ROCKS_WINDOW, kind=DSL] java.lang.AssertionError: Result:StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@2a32fb6, executionInfo=[], position=Position{position={input-topic={0=1, 1=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredWindowedKeyValueIterator@6107165, executionInfo=[], position=Position{position={input-topic={1=1}, globalResult=null} Expected: is <[0, 1, 2, 3]> but: was <[0, 2, 3]> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQuery(IQv2StoreIntegrationTest.java:1234) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.shouldHandleWindowRangeQueries(IQv2StoreIntegrationTest.java:880) at org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.verifyStore(IQv2StoreIntegrationTest.java:793) {code} Some observations: * After I added the
[jira] [Commented] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17486714#comment-17486714 ] John Roesler commented on KAFKA-13600: -- Hi [~tim.patterson] , thanks for the report and the patch! It sounds like you're reporting two things here: # a bug around the acceptable recovery lag. # an improvement on assignment balance If we can discuss those things independently, then we can definitely merge the bugfix immediately. Depending on the impact of the improvement, it might also fall into the category of a simple ticket, or it might be more appropriate to have a KIP as [~cadonna] suggested. Regarding the bug, I find it completely plausible that we have a bug, but I have to confess that I'm not 100% sure I understand the report. Is the situation that there's an active that's happens to be processing quite a bit ahead of the replicas, such that when the active goes offline, there's no "caught-up" node, and instead of failing the task over to the least-lagging node, we just assign it to a fresh node? If that's it, then it is certainly not the desired behavior. The notion of acceptableRecoveryLag was introduced because follower replicas will always lag the active task, by definition. We want task ownership to be able to swap over from the active to a warm-up when it's caught up, but it will never be 100% caught up (because it is a follower until it takes over). acceptableRecoveryLag is a way to define a small amount of lag that "acceptable" so that when a warm-up is only lagging by that amount, we can consider it to be effectively caught up and move the active to the warm-up node. As you can see, this has nothing at all to do with which nodes are eligible to take over when an active exits the cluster. In that case, it was always the intent that the most-caught-up node should take over active processing, regardless of its lag. I've been squinting at our existing code, and also your patch ([https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1)] . It looks to me like the flaw in the existing implementation is essentially just here: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baL92-L96] {code:java} // if the desired client is not caught up, and there is another client that _is_ caught up, then // we schedule a movement, so we can move the active task to the caught-up client. We'll try to // assign a warm-up to the desired client so that we can move it later on.{code} which should indeed be just like what you described: {code:java} // if the desired client is not caught up, and there is another client that _is_ more caught up, // then we schedule a movement [to] move the active task to the [most] caught-up client. // We'll try to assign a warm-up to the desired client so that we can move it later on.{code} On the other hand, we should not lose this important predicate to determine whether a task is considered "caught up: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-e50a755ba2a4d2f7306d1016d079018cba22f9f32993ef5dd64408d1a94d79acL245] {code:java} activeRunning(taskLag) || unbounded(acceptableRecoveryLag) || acceptable(acceptableRecoveryLag, taskLag) {code} This captures a couple of subtleties in addition to the obvious "a task is caught up if it's under the acceptable recovery lag": # A running, active task doesn't have a real lag at all, but instead its "lag" is the sentinel value `-2` # You can disable the "warm up" phase completely by setting acceptableRecoveryLag to `Long.MAX_VALUE`, in which case, we ignore lags completely and consider all nodes to be caught up, even if they didn't report a lag at all. One extra thing I like about your patch is this: [https://github.com/apache/kafka/commit/a4b622685423fbfd68b1291dad85cc1f44b086f1#diff-83a301514ee18b410df40a91595f6f1afd51f6152ff813b5789516cf5c3605baR54-R56] {code:java} // Even if there is a more caught up client, as long as we're within allowable lag then // its best just to stick with what we've got {code} I agree that, if two nodes are within the acceptableRecoveryLag of each other, we should consider their lags to be effectively the same. That's something I wanted to do when we wrote this code, but couldn't figure out a good way to do it. One thing I'd need more time on is the TaskMovementTest. At first glance, it looks like those changes are just about the slightly different method signature, but I'd want to be very sure that we're still testing the same invariants that we wanted to test. Would you be willing to submit this bugfix as a PR so that we can formally review and merge it? > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch >
[jira] [Assigned] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13600: Assignee: (was: John Roesler) > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 2.8.1, 3.0.0 >Reporter: Tim Patterson >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13600: Assignee: John Roesler > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.1, 3.0.0 >Reporter: Tim Patterson >Assignee: John Roesler >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch
[ https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13600: - Affects Version/s: 3.1.0 > Rebalances while streams is in degraded state can cause stores to be > reassigned and restore from scratch > > > Key: KAFKA-13600 > URL: https://issues.apache.org/jira/browse/KAFKA-13600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 2.8.1, 3.0.0 >Reporter: Tim Patterson >Assignee: John Roesler >Priority: Major > > Consider this scenario: > # A node is lost from the cluster. > # A rebalance is kicked off with a new "target assignment"'s(ie the > rebalance is attempting to move a lot of tasks - see > https://issues.apache.org/jira/browse/KAFKA-10121). > # The kafka cluster is now a bit more sluggish from the increased load. > # A Rolling Deploy happens triggering rebalances, during the rebalance > processing continues but offsets can't be committed(Or nodes are restarted > but fail to commit offsets) > # The most caught up nodes now aren't within `acceptableRecoveryLag` and so > the task is started in it's "target assignment" location, restoring all state > from scratch and delaying further processing instead of using the "almost > caught up" node. > We've hit this a few times and having lots of state (~25TB worth) and being > heavy users of IQ this is not ideal for us. > While we can increase `acceptableRecoveryLag` to larger values to try get > around this that causes other issues (ie a warmup becoming active when its > still quite far behind) > The solution seems to be to balance "balanced assignment" with "most caught > up nodes". > We've got a fork where we do just this and it's made a huge difference to the > reliability of our cluster. > Our change is to simply use the most caught up node if the "target node" is > more than `acceptableRecoveryLag` behind. > This gives up some of the load balancing type behaviour of the existing code > but in practise doesn't seem to matter too much. > I guess maybe an algorithm that identified candidate nodes as those being > within `acceptableRecoveryLag` of the most caught up node might allow the > best of both worlds. > > Our fork is > [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1] > (We also moved the capacity constraint code to happen after all the stateful > assignment to prioritise standby tasks over warmup tasks) > Ideally we don't want to maintain a fork of kafka streams going forward so > are hoping to get a bit of discussion / agreement on the best way to handle > this. > More than happy to contribute code/test different algo's in production system > or anything else to help with this issue -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores
[ https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13523: - Affects Version/s: 3.2.0 > Implement IQv2 support in global stores > --- > > Key: KAFKA-13523 > URL: https://issues.apache.org/jira/browse/KAFKA-13523 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > > Global stores pose one significant problem for IQv2: when they start up, they > skip the regular ingest pipeline and instead use the "restoration" pipeline > to read up until the current end offset. Then, they switch over to the > regular ingest pipeline. > IQv2 position tracking expects to track the position of each record from the > input topic through the ingest pipeline and then get the position headers > through the restoration pipeline via the changelog topic. The fact that > global stores "restore" the input topic instead of ingesting it violates our > expectations. > It has also caused other problems, so we may want to consider switching the > global store processing to use the normal paradigm rather than adding > special-case logic to track positions in global stores. > > Note: there are two primary reasons that global stores behave this way: > # We can write in batches during restoration, so the I/O may be more > efficient > # The global thread does not transition to RUNNING state until it reaches > the (current) end of the input topic, which blocks other threads from joining > against it, thereby improving the time synchronization of global KTable joins. > If we want to propose changing the bootstrapping pipeline for global threads, > we should have some kind of answer to these concerns. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries
[ https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13526: - Priority: Blocker (was: Major) > IQv2: Consider more generic logic for mapping between binary and typed queries > -- > > Key: KAFKA-13526 > URL: https://issues.apache.org/jira/browse/KAFKA-13526 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Blocker > > Right now, typed queries (like KeyQuery) need to be specially handled and > translated to their binary counterparts (like RawKeyQuery). This happens in > the Metered store layers, where the serdes are known. It is necessary because > lower store layers are only able to handle binary data (because they don't > know the serdes). > This situation is not ideal, since the Metered store layers will grow to host > quite a bit of query handling and translation logic, because the relationship > between typed queries and binary counterparts is not obvious, and because we > can only automatically translate known query types. User-supplied queries and > stores will have to work things out using their a-priori knowledge of the > serdes. > > One suggestion (from [~mjsax] ) is to come up with some kind of generic > "query mapping" API, which the Metered stores would use to translate back and > forth between typed and raw keys and values. Users would be able to supply > their own mappings along with their custom queries. > Another option would be to have the Metered stores attach the serdes to the > query on the way down and then to the result on the way up. Then, the serdes > would be available in the bytes store (as part of the request) and to the > users when they get their results (as part of the response). > Other options may also surface once we start playing with ideas. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13548: - Priority: Blocker (was: Major) > IQv2: revisit WindowKeyQuery and WindowRangeQuery > - > > Key: KAFKA-13548 > URL: https://issues.apache.org/jira/browse/KAFKA-13548 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Blocker > > During discussion of KIP-806, there was a suggestion to refactor the queries > following a builder pattern so that we can compactly and flexibly specify > lower and upper bounds on the keys, window start times, and window end times. > We should circle back and try to generalize the queries' interfaces before > the first release of IQv2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores
[ https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13622: - Affects Version/s: 3.2.0 > Revisit the complexity of position tracking in state stores > --- > > Key: KAFKA-13622 > URL: https://issues.apache.org/jira/browse/KAFKA-13622 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > > Currently, state store implementers have a significant burden to track > position correctly. They have to: > * update the position during all puts > * implement the RecordBatchingStateRestoreCallback and use the > {color:#00}ChangelogRecordDeserializationHelper to update the position > based on record headers{color} > * {color:#00}implement some mechanism to restore the position after a > restart if the store is persistent (such as supply a CommitCallback to write > the position to a local file and then read the file during init){color} > {color:#00}[~guozhang] pointed out during review that this is probably > too much responsibility (and certainly too much opportunity for error). We > should see what we can do to simplify these responsibilities, if not > eliminate them entirely from the store implementer's scope of concern. > {color} > > {color:#00}See > https://github.com/apache/kafka/pull/11676#discussion_r790358058{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe
[ https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13541: - Affects Version/s: 3.2.0 > Make IQv2 query/store interface type safe > - > > Key: KAFKA-13541 > URL: https://issues.apache.org/jira/browse/KAFKA-13541 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Blocker > > Currently the new IQv2 interface allows applications to query state stores > using subclasses of the Query type. Unfortunately there is currently no > way to check that the template type of the query matches the type of the > relevant store the query is executed on. As a consequence stores have to do a > set of unsafe casts. > This ticket is to explore ways to make the query interface type safe where > only type mismatches are detected at compile time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13541) Make IQv2 query/store interface type safe
[ https://issues.apache.org/jira/browse/KAFKA-13541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13541: - Priority: Blocker (was: Major) > Make IQv2 query/store interface type safe > - > > Key: KAFKA-13541 > URL: https://issues.apache.org/jira/browse/KAFKA-13541 > Project: Kafka > Issue Type: Sub-task >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Blocker > > Currently the new IQv2 interface allows applications to query state stores > using subclasses of the Query type. Unfortunately there is currently no > way to check that the template type of the query matches the type of the > relevant store the query is executed on. As a consequence stores have to do a > set of unsafe casts. > This ticket is to explore ways to make the query interface type safe where > only type mismatches are detected at compile time. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13523) Implement IQv2 support in global stores
[ https://issues.apache.org/jira/browse/KAFKA-13523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13523: - Priority: Blocker (was: Major) > Implement IQv2 support in global stores > --- > > Key: KAFKA-13523 > URL: https://issues.apache.org/jira/browse/KAFKA-13523 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Blocker > > Global stores pose one significant problem for IQv2: when they start up, they > skip the regular ingest pipeline and instead use the "restoration" pipeline > to read up until the current end offset. Then, they switch over to the > regular ingest pipeline. > IQv2 position tracking expects to track the position of each record from the > input topic through the ingest pipeline and then get the position headers > through the restoration pipeline via the changelog topic. The fact that > global stores "restore" the input topic instead of ingesting it violates our > expectations. > It has also caused other problems, so we may want to consider switching the > global store processing to use the normal paradigm rather than adding > special-case logic to track positions in global stores. > > Note: there are two primary reasons that global stores behave this way: > # We can write in batches during restoration, so the I/O may be more > efficient > # The global thread does not transition to RUNNING state until it reaches > the (current) end of the input topic, which blocks other threads from joining > against it, thereby improving the time synchronization of global KTable joins. > If we want to propose changing the bootstrapping pipeline for global threads, > we should have some kind of answer to these concerns. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13548) IQv2: revisit WindowKeyQuery and WindowRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13548: - Affects Version/s: 3.2.0 > IQv2: revisit WindowKeyQuery and WindowRangeQuery > - > > Key: KAFKA-13548 > URL: https://issues.apache.org/jira/browse/KAFKA-13548 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > > During discussion of KIP-806, there was a suggestion to refactor the queries > following a builder pattern so that we can compactly and flexibly specify > lower and upper bounds on the keys, window start times, and window end times. > We should circle back and try to generalize the queries' interfaces before > the first release of IQv2. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13526) IQv2: Consider more generic logic for mapping between binary and typed queries
[ https://issues.apache.org/jira/browse/KAFKA-13526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13526: - Affects Version/s: 3.2.0 > IQv2: Consider more generic logic for mapping between binary and typed queries > -- > > Key: KAFKA-13526 > URL: https://issues.apache.org/jira/browse/KAFKA-13526 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Priority: Blocker > > Right now, typed queries (like KeyQuery) need to be specially handled and > translated to their binary counterparts (like RawKeyQuery). This happens in > the Metered store layers, where the serdes are known. It is necessary because > lower store layers are only able to handle binary data (because they don't > know the serdes). > This situation is not ideal, since the Metered store layers will grow to host > quite a bit of query handling and translation logic, because the relationship > between typed queries and binary counterparts is not obvious, and because we > can only automatically translate known query types. User-supplied queries and > stores will have to work things out using their a-priori knowledge of the > serdes. > > One suggestion (from [~mjsax] ) is to come up with some kind of generic > "query mapping" API, which the Metered stores would use to translate back and > forth between typed and raw keys and values. Users would be able to supply > their own mappings along with their custom queries. > Another option would be to have the Metered stores attach the serdes to the > query on the way down and then to the result on the way up. Then, the serdes > would be available in the bytes store (as part of the request) and to the > users when they get their results (as part of the response). > Other options may also surface once we start playing with ideas. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13554: - Affects Version/s: 3.2.0 > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Assignee: Vicky Papavasileiou >Priority: Major > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13554: - Priority: Blocker (was: Major) > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 3.2.0 >Reporter: John Roesler >Assignee: Vicky Papavasileiou >Priority: Blocker > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13622) Revisit the complexity of position tracking in state stores
[ https://issues.apache.org/jira/browse/KAFKA-13622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13622: - Priority: Blocker (was: Major) > Revisit the complexity of position tracking in state stores > --- > > Key: KAFKA-13622 > URL: https://issues.apache.org/jira/browse/KAFKA-13622 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Blocker > > Currently, state store implementers have a significant burden to track > position correctly. They have to: > * update the position during all puts > * implement the RecordBatchingStateRestoreCallback and use the > {color:#00}ChangelogRecordDeserializationHelper to update the position > based on record headers{color} > * {color:#00}implement some mechanism to restore the position after a > restart if the store is persistent (such as supply a CommitCallback to write > the position to a local file and then read the file during init){color} > {color:#00}[~guozhang] pointed out during review that this is probably > too much responsibility (and certainly too much opportunity for error). We > should see what we can do to simplify these responsibilities, if not > eliminate them entirely from the store implementer's scope of concern. > {color} > > {color:#00}See > https://github.com/apache/kafka/pull/11676#discussion_r790358058{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13622) Revisit the complexity of position tracking in state stores
John Roesler created KAFKA-13622: Summary: Revisit the complexity of position tracking in state stores Key: KAFKA-13622 URL: https://issues.apache.org/jira/browse/KAFKA-13622 Project: Kafka Issue Type: Sub-task Reporter: John Roesler Currently, state store implementers have a significant burden to track position correctly. They have to: * update the position during all puts * implement the RecordBatchingStateRestoreCallback and use the {color:#00}ChangelogRecordDeserializationHelper to update the position based on record headers{color} * {color:#00}implement some mechanism to restore the position after a restart if the store is persistent (such as supply a CommitCallback to write the position to a local file and then read the file during init){color} {color:#00}[~guozhang] pointed out during review that this is probably too much responsibility (and certainly too much opportunity for error). We should see what we can do to simplify these responsibilities, if not eliminate them entirely from the store implementer's scope of concern. {color} {color:#00}See https://github.com/apache/kafka/pull/11676#discussion_r790358058{color} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13608) Implement Position restoration for all in-memory state stores
[ https://issues.apache.org/jira/browse/KAFKA-13608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-13608. -- Resolution: Duplicate > Implement Position restoration for all in-memory state stores > - > > Key: KAFKA-13608 > URL: https://issues.apache.org/jira/browse/KAFKA-13608 > Project: Kafka > Issue Type: Sub-task >Reporter: Vicky Papavasileiou >Priority: Major > > In-memory state stores restore their state from the changelog (as opposed to > RocksDB stores that restore from disk). In-memory stores currently don't > handle restoring of the Position -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13608) Implement Position restoration for all in-memory state stores
[ https://issues.apache.org/jira/browse/KAFKA-13608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482866#comment-17482866 ] John Roesler commented on KAFKA-13608: -- Ah, I didn't realize there was a ticket for this. I just happened to take care of it while making sure that the testing for [https://github.com/apache/kafka/pull/11676] was complete. > Implement Position restoration for all in-memory state stores > - > > Key: KAFKA-13608 > URL: https://issues.apache.org/jira/browse/KAFKA-13608 > Project: Kafka > Issue Type: Sub-task >Reporter: Vicky Papavasileiou >Priority: Major > > In-memory state stores restore their state from the changelog (as opposed to > RocksDB stores that restore from disk). In-memory stores currently don't > handle restoring of the Position -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-13524. -- Resolution: Fixed > IQv2: Implement KeyQuery from the RecordCache > - > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: Vicky Papavasileiou >Priority: Major > > The Record Cache in Kafka Streams is more properly termed a write buffer, > since it only caches writes, not reads, and its intent is to buffer the > writes before flushing them in bulk into lower store layers. > Unlike scan-type queries, which require scanning both the record cache and > the underlying store and collating the results, the KeyQuery (and any other > point lookup) can straightforwardly be served from the record cache if it is > buffered or fall through to the underlying store if not. > In contrast to scan-type operations, benchmarks reveal that key-based cache > reads are faster than always skipping the cache as well. > Therefore, it makes sense to implement a handler in the CachingKeyValueStore > for the KeyQuery specifically in order to serve fresher key-based lookups. > Scan queries may also be useful, but their less flattering performance > profile makes it reasonable to leave them for follow-on work. > We could add an option to disable cache reads on the KeyQuery, but since they > seem to be always better, I'm leaning toward just unilaterally serving cached > records if they exist. > > I did a quick POC of this: > [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] > > The internal code of the caching stores should be refactored to share logic > with the regular store methods. Scan queries will be more complicated, since > they require merging the cache with the wrapped result. > There is a bug related to that non-timestamped-store-serde hack (see the > failing test when you run IQv2StoreIntegrationTest). Even though the inner > store is not timestamped, the cache returns a timestamped value. We'll have > to discuss options to fix it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13524: Assignee: Vicky Papavasileiou > IQv2: Implement KeyQuery from the RecordCache > - > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: Vicky Papavasileiou >Priority: Major > > The Record Cache in Kafka Streams is more properly termed a write buffer, > since it only caches writes, not reads, and its intent is to buffer the > writes before flushing them in bulk into lower store layers. > Unlike scan-type queries, which require scanning both the record cache and > the underlying store and collating the results, the KeyQuery (and any other > point lookup) can straightforwardly be served from the record cache if it is > buffered or fall through to the underlying store if not. > In contrast to scan-type operations, benchmarks reveal that key-based cache > reads are faster than always skipping the cache as well. > Therefore, it makes sense to implement a handler in the CachingKeyValueStore > for the KeyQuery specifically in order to serve fresher key-based lookups. > Scan queries may also be useful, but their less flattering performance > profile makes it reasonable to leave them for follow-on work. > We could add an option to disable cache reads on the KeyQuery, but since they > seem to be always better, I'm leaning toward just unilaterally serving cached > records if they exist. > > I did a quick POC of this: > [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] > > The internal code of the caching stores should be refactored to share logic > with the regular store methods. Scan queries will be more complicated, since > they require merging the cache with the wrapped result. > There is a bug related to that non-timestamped-store-serde hack (see the > failing test when you run IQv2StoreIntegrationTest). Even though the inner > store is not timestamped, the cache returns a timestamped value. We'll have > to discuss options to fix it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13605) Checkpoint position in state stores
[ https://issues.apache.org/jira/browse/KAFKA-13605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13605: - Parent: KAFKA-13479 Issue Type: Sub-task (was: Improvement) > Checkpoint position in state stores > --- > > Key: KAFKA-13605 > URL: https://issues.apache.org/jira/browse/KAFKA-13605 > Project: Kafka > Issue Type: Sub-task >Reporter: Patrick Stuedi >Assignee: Patrick Stuedi >Priority: Critical > > There are cases in which a state store neither has an in-memory position > built up nor has it gone through the state restoration process. If a store is > persistent (i.e., RocksDB), and we stop and restart Streams, we will have > neither of those continuity mechanisms available. This ticket is to fill in > that gap. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13524: - Description: The Record Cache in Kafka Streams is more properly termed a write buffer, since it only caches writes, not reads, and its intent is to buffer the writes before flushing them in bulk into lower store layers. Unlike scan-type queries, which require scanning both the record cache and the underlying store and collating the results, the KeyQuery (and any other point lookup) can straightforwardly be served from the record cache if it is buffered or fall through to the underlying store if not. In contrast to scan-type operations, benchmarks reveal that key-based cache reads are faster than always skipping the cache as well. Therefore, it makes sense to implement a handler in the CachingKeyValueStore for the KeyQuery specifically in order to serve fresher key-based lookups. Scan queries may also be useful, but their less flattering performance profile makes it reasonable to leave them for follow-on work. We could add an option to disable cache reads on the KeyQuery, but since they seem to be always better, I'm leaning toward just unilaterally serving cached records if they exist. I did a quick POC of this: [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] The internal code of the caching stores should be refactored to share logic with the regular store methods. Scan queries will be more complicated, since they require merging the cache with the wrapped result. There is a bug related to that non-timestamped-store-serde hack (see the failing test when you run IQv2StoreIntegrationTest). Even though the inner store is not timestamped, the cache returns a timestamped value. We'll have to discuss options to fix it. was: I did a quick POC of this: [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] The internal code of the caching stores should be refactored to share logic with the regular store methods. Scan queries will be more complicated, since they require merging the cache with the wrapped result. There is a bug related to that non-timestamped-store-serde hack (see the failing test when you run IQv2StoreIntegrationTest). Even though the inner store is not timestamped, the cache returns a timestamped value. We'll have to discuss options to fix it. > IQv2: Implement KeyQuery from the RecordCache > - > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > The Record Cache in Kafka Streams is more properly termed a write buffer, > since it only caches writes, not reads, and its intent is to buffer the > writes before flushing them in bulk into lower store layers. > Unlike scan-type queries, which require scanning both the record cache and > the underlying store and collating the results, the KeyQuery (and any other > point lookup) can straightforwardly be served from the record cache if it is > buffered or fall through to the underlying store if not. > In contrast to scan-type operations, benchmarks reveal that key-based cache > reads are faster than always skipping the cache as well. > Therefore, it makes sense to implement a handler in the CachingKeyValueStore > for the KeyQuery specifically in order to serve fresher key-based lookups. > Scan queries may also be useful, but their less flattering performance > profile makes it reasonable to leave them for follow-on work. > We could add an option to disable cache reads on the KeyQuery, but since they > seem to be always better, I'm leaning toward just unilaterally serving cached > records if they exist. > > I did a quick POC of this: > [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] > > The internal code of the caching stores should be refactored to share logic > with the regular store methods. Scan queries will be more complicated, since > they require merging the cache with the wrapped result. > There is a bug related to that non-timestamped-store-serde hack (see the > failing test when you run IQv2StoreIntegrationTest). Even though the inner > store is not timestamped, the cache returns a timestamped value. We'll have > to discuss options to fix it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13524) IQv2: Add option for KeyQuery from the RecordCache
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13524: - Summary: IQv2: Add option for KeyQuery from the RecordCache (was: IQv2: Add option for KeyQuery from caches) > IQv2: Add option for KeyQuery from the RecordCache > -- > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > I did a quick POC of this: > [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] > > > The internal code of the caching stores should be refactored to share logic > with the regular store methods. Scan queries will be more complicated, since > they require merging the cache with the wrapped result. > There is a bug related to that non-timestamped-store-serde hack (see the > failing test when you run IQv2StoreIntegrationTest). Even though the inner > store is not timestamped, the cache returns a timestamped value. We'll have > to discuss options to fix it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13524) IQv2: Add option for KeyQuery from caches
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13524: - Summary: IQv2: Add option for KeyQuery from caches (was: IQv2: Add option to query KV Stores from caches) > IQv2: Add option for KeyQuery from caches > - > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > I did a quick POC of this: > [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] > > The internal code of the caching stores should be refactored to share logic > with the regular store methods. Scan queries will be more complicated, since > they require merging the cache with the wrapped result. > There is a bug related to that non-timestamped-store-serde hack (see the > failing test when you run IQv2StoreIntegrationTest). Even though the inner > store is not timestamped, the cache returns a timestamped value. We'll have > to discuss options to fix it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13524) IQv2: Implement KeyQuery from the RecordCache
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13524: - Summary: IQv2: Implement KeyQuery from the RecordCache (was: IQv2: Add option for KeyQuery from the RecordCache) > IQv2: Implement KeyQuery from the RecordCache > - > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > I did a quick POC of this: > [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] > > > The internal code of the caching stores should be refactored to share logic > with the regular store methods. Scan queries will be more complicated, since > they require merging the cache with the wrapped result. > There is a bug related to that non-timestamped-store-serde hack (see the > failing test when you run IQv2StoreIntegrationTest). Even though the inner > store is not timestamped, the cache returns a timestamped value. We'll have > to discuss options to fix it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13524) IQv2: Add option for KeyQuery from caches
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13524: - Description: I did a quick POC of this: [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] The internal code of the caching stores should be refactored to share logic with the regular store methods. Scan queries will be more complicated, since they require merging the cache with the wrapped result. There is a bug related to that non-timestamped-store-serde hack (see the failing test when you run IQv2StoreIntegrationTest). Even though the inner store is not timestamped, the cache returns a timestamped value. We'll have to discuss options to fix it. was: I did a quick POC of this: [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] The internal code of the caching stores should be refactored to share logic with the regular store methods. Scan queries will be more complicated, since they require merging the cache with the wrapped result. There is a bug related to that non-timestamped-store-serde hack (see the failing test when you run IQv2StoreIntegrationTest). Even though the inner store is not timestamped, the cache returns a timestamped value. We'll have to discuss options to fix it. > IQv2: Add option for KeyQuery from caches > - > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > I did a quick POC of this: > [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] > > > The internal code of the caching stores should be refactored to share logic > with the regular store methods. Scan queries will be more complicated, since > they require merging the cache with the wrapped result. > There is a bug related to that non-timestamped-store-serde hack (see the > failing test when you run IQv2StoreIntegrationTest). Even though the inner > store is not timestamped, the cache returns a timestamped value. We'll have > to discuss options to fix it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13524) IQv2: Add option to query KV Stores from caches
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13524: - Description: I did a quick POC of this: [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] The internal code of the caching stores should be refactored to share logic with the regular store methods. Scan queries will be more complicated, since they require merging the cache with the wrapped result. There is a bug related to that non-timestamped-store-serde hack (see the failing test when you run IQv2StoreIntegrationTest). Even though the inner store is not timestamped, the cache returns a timestamped value. We'll have to discuss options to fix it. > IQv2: Add option to query KV Stores from caches > --- > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > I did a quick POC of this: > [https://github.com/vvcephei/kafka/pull/new/iqv2-poc-cache-queries] > > The internal code of the caching stores should be refactored to share logic > with the regular store methods. Scan queries will be more complicated, since > they require merging the cache with the wrapped result. > There is a bug related to that non-timestamped-store-serde hack (see the > failing test when you run IQv2StoreIntegrationTest). Even though the inner > store is not timestamped, the cache returns a timestamped value. We'll have > to discuss options to fix it. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13524) IQv2: Add option to query KV Stores from caches
[ https://issues.apache.org/jira/browse/KAFKA-13524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13524: - Summary: IQv2: Add option to query KV Stores from caches (was: IQv2: Add option to query from caches) > IQv2: Add option to query KV Stores from caches > --- > > Key: KAFKA-13524 > URL: https://issues.apache.org/jira/browse/KAFKA-13524 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13553) Add PAPI stores to IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-13553: - Summary: Add PAPI stores to IQv2StoreIntegrationTest (was: Add DSL stores to IQv2StoreIntegrationTest) > Add PAPI stores to IQv2StoreIntegrationTest > --- > > Key: KAFKA-13553 > URL: https://issues.apache.org/jira/browse/KAFKA-13553 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Priority: Major > > Right now, we only test stores registered via the DSL. To be truly > comprehensive, we must also test stores registered via the PAPI. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13553) Add PAPI stores to IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13553: Assignee: John Roesler > Add PAPI stores to IQv2StoreIntegrationTest > --- > > Key: KAFKA-13553 > URL: https://issues.apache.org/jira/browse/KAFKA-13553 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Right now, we only test stores registered via the DSL. To be truly > comprehensive, we must also test stores registered via the PAPI. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-13554) Rename RangeQuery to KeyRangeQuery
[ https://issues.apache.org/jira/browse/KAFKA-13554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-13554: Assignee: Vicky Papavasileiou > Rename RangeQuery to KeyRangeQuery > -- > > Key: KAFKA-13554 > URL: https://issues.apache.org/jira/browse/KAFKA-13554 > Project: Kafka > Issue Type: Sub-task >Reporter: John Roesler >Assignee: Vicky Papavasileiou >Priority: Major > > Just to avoid confusion wrt WindowRangeQuery -- This message was sent by Atlassian Jira (v8.20.1#820001)