[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838878#comment-16838878 ] John Roesler commented on KAFKA-8315: - [~the4thamigo_uk], The python tests are something different. I was just talking about the Java classes that are called like "WhateverWhateverIntegrationTest". You should be able to run those right from the IDE. If you want to run all the streams integration tests, it's `./gradlew clean :streams:test`. It will take 7 minutes-ish. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838783#comment-16838783 ] Andrew commented on KAFKA-8315: --- Thanks [~vvcephei], I am running the unit tests in the kafka project. I confess I havent worked out the integration tests yet, but I saw the python scripts for this. I agree the ordering from the RecordQueue/PartitionGroup looks sound, and that it is something weird with how data is pushed into the RecordQueues from the source topics. [~ableegoldman] any clues on this would be greatly appreciated. Thanks again for the responses. Hopefully, we will get to the bottom of this. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838720#comment-16838720 ] John Roesler commented on KAFKA-8315: - Hi [~the4thamigo_uk], Unfortunately, the TopologyTestDriver is going to be insufficient for exercising the behavior you want, since it processes events synchronously as soon as you call `pipeInput`, but the problem you're having appears to be with the logic that chooses records polled from Kafka (which only KafkaStreams does). I'd suggest, as the fastest way to try and nail this down, actually to pull the Kafka project down (since we have set up integration tests that actually do use the brokers and run a "real" KafkaStreams) and modify one of the join integration tests to reproduce your use case. This still sounds like a bug to me, even though it might not be the one that [~ableegoldman] reported. Regarding the ticket, it'd be better not to split the history of this investigation, so I recommend just editing the title and description of the ticket, instead of making a new ticket. Thanks, -John > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838565#comment-16838565 ] Andrew commented on KAFKA-8315: --- This is the test enhanced to use timestamp extraction, and it works ; https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838550#comment-16838550 ] Andrew commented on KAFKA-8315: --- This test appears to work ok [https://github.com/the4thamigo-uk/kafka/commit/560432e7daae217a2255161787dd55ca56845794.] > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838471#comment-16838471 ] Andrew commented on KAFKA-8315: --- [~vvcephei] [~ableegoldman] I was just looking at the unit tests for PartitionGroup and noticed that the comment refers to timestamps, but the ConsumerGroup constructor that is used passes the value in the offset parameter : [https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java#L92] Is this correct? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838012#comment-16838012 ] Andrew commented on KAFKA-8315: --- [~ableegoldman] Right, I see what you mean, this loop only goes until the head is found [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L158.] We do have out of order data in our real data streams, however, it looks like you are right that it shouldn't affect my {{join-example}} demo, which reproduces the issue with only ordered data. Any further ideas on why the {{join-example}} doesnt work? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837765#comment-16837765 ] Sophie Blee-Goldman commented on KAFKA-8315: [~the4thamigo_uk] After I directed you to check out RecordQueue I went back to look over it and filed the ticket John linked: you're right, it actually was going by partition time rather than by the timestamp of the head record. I've opened a simple PR with the fix [here|[https://github.com/apache/kafka/pull/6719]] That said, I'm not sure this actually affects your use case. If all the data is in order, partition time should be the same as the head record's timestamp, so this should only come into play when processing out of order data. In your example above, partition A would have streamtime = 1 when first choosing the next record, as it will not have seen the record with timestamp 4 yet. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837303#comment-16837303 ] John Roesler commented on KAFKA-8315: - Ah, yes. My apologies. I was mistaken about the PartitionGroup logic. I think you're right, and actually, [~ableegoldman] has filed https://issues.apache.org/jira/browse/KAFKA-8347, so I guess she agrees, too. I think it's actually a pretty straightforward change, and I actually don't think it needs a KIP either. Should we try to get this change in for the 15 May feature freeze for 2.3? Since you've taken the time to get familiar with the code, do you want to send a PR, [~the4thamigo_uk]? Offhand, I think we can just redefine RecordQueue.timestamp() to be the head timestamp instead of the high water-mark time. And, of course write/update the relevant tests. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16837086#comment-16837086 ] Andrew commented on KAFKA-8315: --- [~ableegoldman] Can I sanity check my understanding here. From what I can tell, the {{nonEmptyQueuesByTime}} orders the {{RecordQueue}} instances by the {{RecordQueue.partitionTime}}. The {{RecordQueue.partitionTime}} is the most recent timestamp that has been read into the {{RecordQueue.fifoQueue}}. If I have a {{RecordQueue}} A with two records with timestamps 1 and 4, and another {{RecordQueue}} B with two records with timestamps 2 and 3, then A will have {{RecordQueue.partitionTime = 4}} and B will have {{RecordQueue.partitionTime = 3}}. So B wlil be selected and the next record will be 2, not 1. Is that right? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836735#comment-16836735 ] Andrew commented on KAFKA-8315: --- Thanks [~ableegoldman] and [~vvcephei]. I have today tried to isolate the issue we are facing into a simple demo app. I have two streams of data x and y, x is high frequency (https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y is low frequency https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat. I set the join window sizes, and the grace here https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh. There is logging as records are read off the streams via a transformer here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54. There is logging on the join here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93 What I see are joins only from here https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553 Am I doing something wrong here? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836606#comment-16836606 ] Sophie Blee-Goldman commented on KAFKA-8315: [~the4thamigo_uk] The code you're looking for describing the logic of choosing the next record to process is in org.apache.kafka.streams.processor.internals.PartitionGroup, which contains a priority queue "nonEmptyQueuesByTime" that serves up the partition with the lowest timestamp when polled (unless max.idle.ms has passed as checked in StreamTask#isProcessable) > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836278#comment-16836278 ] Andrew commented on KAFKA-8315: --- Ok, so it should work as I had previously hoped, so maybe our working hypothesis is incorrect then. As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835832#comment-16835832 ] John Roesler commented on KAFKA-8315: - Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your investigation is progressing. Last question first: Sterams should choose to consume from the left or right based on which one has the lower timestamp in the next record, so I would not expect one side to "run ahead" of the other. There's one caveat, that when one side is being produced more slowly, Streams won't just wait indefinitely for the next data, but instead just process the side that does have data. This is controled by the "max idle ms" config, but since you're processing historically, this shouldn't be your problem. Still might be worth a look. Maybe for debugging purposes, you can print out the key, value, and timestamp for each of the sides as well as in the joiner, so you can identify which side is triggering the join, and evaluate whether or not it's correctly time-ordered. If it is in fact running ahead on one side, despite what it should be doing, this would explain why you see better results with a larger grace period. To confirm, the grace period should only matter up to the maximum time skew in your stream. So, as you said, if you have two producers that each produce a full 24 hours of data, sequentially, then you should see stream time advance when the first producer writes its data, and then "freeze" while the second producer writes its (out-of-order) data. Thus, you'll want to set the grace period to keep old windows around for at least 24 hours, since you know you have to wait for that second producer's data. Finally, to answer your earlier questions, yes, each task is handling just one partition of both input topics (the same partition on the left and right). Stream Time is independently maintained for each task/partition, and it is computed simply as the highest timestamp yet observed for that partition. If you want to look at it in detail, it's tracked in org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . Actually, you can set that class's logger to DEBUG mode and it'll print out every time it skips a record that is outside of retention. Minor point, you should not need to mess with the retention of the changelog topic. Streams sets this appropriately to preserve the same data as the store, but this is only apparent when restoring the store. The actual results of the join are served out of the state store, so only the state store's retention matters. This is what you're setting with the grace period. I hope this helps! -John > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835784#comment-16835784 ] Andrew commented on KAFKA-8315: --- After a lot of investigation, we think this issue is down to the fact that we have a left stream with minute-by-minute data and a right topic with daily data. It is not clear what logic controls the rate at which records are read from left and right streams, but we believe that the right topic is being read at a rate such that that it quickly gets too far ahead of the left stream (in terms of event-time) and therefore the right stream windows are being expired before the left stream data has been read. [~vvcephei] What controls the rate that records are read from the left and right streams? Is there any guarantee that the timestamps for the records in the left and right streams are kept more-or-less in line with the records from the right stream? If not, is there any way we can somehow delay the right stream? Thanks for your help above. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833161#comment-16833161 ] Andrew commented on KAFKA-8315: --- I have run a join by removing the timestamp transform and the aggregation and I still get the same behaviour. i.e. the following topology : {Code} private static Topology joinStreamStream(final JoinerProperties props) { final JoinedRecordFactory joinedRecordFactory = JoinedRecordFactory.create(props.leftTopic().getSchema(), props.rightTopic().getSchema()); final FieldMapper leftFieldMapper = FieldMapper.create(props.leftTopic().getFields()); final FieldMapper rightFieldMapper = FieldMapper.create(props.rightTopic().getFields()); final JoinValueMapper joinValueMapper = JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, props.joinSchema()); // extractors final TimestampExtractor leftTsExtractor = AvroTimestampExtractor.create(props.leftTopic().getTimestampField()); final TimestampExtractor rightTsExtractor = AvroTimestampExtractor.create(props.rightTopic().getTimestampField()); final StreamsBuilder builder = new StreamsBuilder(); final Consumed leftConsumed = Consumed.with(leftTsExtractor); final KStream leftStream = AvroMinMaxTimestampTransformer.wrap( builder.stream(props.leftTopic().getName(), leftConsumed), props.minStreamTimestamp(), props.maxStreamTimestamp()); final Consumed rightConsumed = Consumed.with(rightTsExtractor); final KStream rightStream = AvroMinMaxTimestampTransformer.wrap( builder.stream(props.rightTopic().getName(), rightConsumed), props.minStreamTimestamp(), props.maxStreamTimestamp()); // setup the join final ValueJoiner joiner = AvroFieldsValueJoiner.create(joinedRecordFactory); final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO).after(props.joinWindowAfterSize()).before(props.joinWindowBeforeSize()).grace(props.joinWindowGrace()); final KStreamKStreamJoinFunction join = props.joinType() == JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin; final KStream joinStream = join.execute(rightStream, joiner, joinWindow); // write the change-log stream to the topic joinStream .mapValues(joinValueMapper::apply) .to(props.joinTopic()); return builder.build(); } {Code} > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833088#comment-16833088 ] Andrew commented on KAFKA-8315: --- Ive attached my code: * I have logging in the ValueJoiner so I can see that joins do not occur before the period mentioned * The ValueJoiner is very simply setting the entire left and right GenericRecord objects to a parent GenericRecord e.g. {Code} final GenericRecord record = new GenericData.Record(this.schema); record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, leftRecord); record.put(RIGHT_VALUE_FIELD, rightRecord);return record; {Code} * I have maybe an unusual transform on line 33, which ensures that the joined record always has the timestamp of the left record. This is important for the aggregation phase. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832577#comment-16832577 ] Andrew commented on KAFKA-8315: --- [~vvcephei] Update I ran using grace = 20D and I see records from around 23D prior to the latest data in the topics. So it seems that we definitely can join the records, but that the grace period is the critical factor in doing this for a historical ingest. So, for a 2 year ingestion do I have to set my grace to 2 years? Seems a bit strange, and maybe sounds a bit inefficient? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug >Reporter: Andrew >Assignee: John Roesler >Priority: Major > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832543#comment-16832543 ] Andrew commented on KAFKA-8315: --- [~vvcephei] Ive been struggling with this today. * I have logging on every join, and I can see that joins do not occur before the 6 day period, except in a few cases (see below) * Ive checked the data and I can definitely see that the vast majority of records should have joins throughout the whole period. * I have added `windowstore.changelog.additional.retention.ms` so that the auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms of 840 hours. * I have removed my previous call to `until()` and only set the window size to 2 days, and increased the grace to 3 days. None of the above seems to make any difference. What I have observed is that I get a few joins before the 6 day window for a single partition and this partition is the first to complete as it has the fewest records. Both topics are partitioned using murmur2 into 20 partitions (Ive checked we have the same keys in the corresponding partitions ofleft and right topics). We are running 4 instances of the streams application, and we do not explicitly set the `num.stream.threads`. My understanding is that a task is created for each partition (i.e. 20 tasks) and the work for these tasks is distributed out to the stream threads in our 4 streams applications. My assumption is that stream time is per-task (i.e. per partition). Is this correct? Is there _any_ possibility that the stream time of partition 13 is somehow shared with any of the other tasks, such that windows might be closed before the join-able data is read on the other partitions. Id like to understand some more about how stream-time increases. I imagine (probably naively) that within a task, stream time increases as the latest timestamp read from a partition, and that both left and right streams have their own stream time. I also assume that during a join, the left stream is read, up until just after the current right stream-time, then the right stream is read up until the latest left stream-time, so that data is pulled off both streams to minimize the difference in times between the latest records read off the topics. Is this near the mark? I will next try the join using a very large grace period to see if it makes a difference. One other thing I might try is to cap my end time using a stream filter to see if I manage to join to earlier records. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug >Reporter: Andrew >Assignee: John Roesler >Priority: Major > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832325#comment-16832325 ] Andrew commented on KAFKA-8315: --- The fudge factor is the \{windowstore.changelog.additional.retention.ms} which is set to 24h by default, so that seems to add up to 5 days (120 hours). You understand my use case well. The choice of limiting the grace was to reduce the performance overhead during the large historical processing. I definitely have lots of data beyond this that should join and the joins and aggregation within the latest 120 hours seem to be correct and complete. This seems to be just related to retention. I will experiment with increasing \{windowstore.changelog.additional.retention.ms} to see if it brings in more data. One oddity of our left stream is that it contains records in batches from different devices. Each batch is about 1000 records and contiguous within the stream. Within a batch the records are in increasing timestamp order. Subsequent batches from different devices will be within 1-2 days of the previous batch (we don't have, say, a batch for 1/1/2019 followed by a batch for 4/1/2019 or a batch for 24/12/2019 for example. The right stream is single records not batches with similar date ordering (i.e. subsequent records should be within 1-2 days of each other. My understanding is that the windows are closed when streamtime - windowstart > windowsize + grace. So as stream time increases as newer batches arrive, joins/aggregations should continue, until a batch arrives after windowsize + grace. However, we are not seeing this. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug >Reporter: Andrew >Assignee: John Roesler >Priority: Major > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831927#comment-16831927 ] John Roesler commented on KAFKA-8315: - Interesting. Thanks for the context. For some reason, the slack link doesn't take me to the thread. Your understanding is almost spot-on. The grace period defines how long after the window ends it will accept late-arriving records and update the results. The retention time defines how long we keep the state of the window in storage. Clearly, the retention time must be at least big enough to support the updates that may happen during the grace period, but it could be much larger, to support Interactive Queries even after the window is closed to updates. Join windows are a little bit different, because they are not queriable. Therefore, there is no reason to have any retention beyond the grace period. This is also why there's no `Materialized` parameter. The state for the join is purely bookeeping, not a "materialized view" in the data processing sense. Since you mention the apparent similarity with grouped aggregations, the fact that JoinWindows shares a class hierarchy with Windows, and the fact that it uses a "normal" WindowStore internally, was mostly for implementation convenience. It's actually a little semantically abusive if you really get into it, and I've heard a few times that people would like to break joins out and clean the whole situation up. Things get really complicated when we need to compute defaults, though. The concepts of "grace" and "retention" used to be coupled into just "retention" (aka "until" aka "maintainMs"), and the default was set to 24h. So, if we pick any default grace period shorter than 24h, then some apps may start to drop late data that didn't before. Also, the "retention" configuration in Windows is only deprecated, not removed, so someone may set a retention time on the deprecated methods, but not a grace period, and we also need to do the "right thing" in that case. This is just in the way of justifying why the code is so complicated. Hopefully, we can drop the deprecated methods soonish and clean the whole thing up. So, back to your actual behavior, you *should* see that the window stores for join windows use `Duration.ofMillis(windows.size() + windows.gracePeriodMs())`, as you pointed out above. The deprecated `until` should be ignored. It's possible the topic retention doesn't get updated when you change your configs, which would be a bug. One thing I didn't understand is the arithmetic from your conversation. I'll take a shot and maybe you can set me straight... You want to join 2 years of historical data. For each join candidate, you only want to look back 2 days, so you set the join window to size=2 days. You want to emit updated join results in the case of time-disordered records, but not indefinitely. Specifically, you only want to emit updated results up to 2 days after the fact, so you set the grace period to 2 days as well. With these configurations, you should see the retention time on the topic set to at least 4 days. 120 hours is 5 days, so this seems about right to me (there might be some fudge factor, I'm not sure). I guess the big problem is that your data fails to join. I'd start with identifying some pair of records that you think *should* join, and then identify why they don't (could be a join window too small, or it could be the grace period too small). > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug >Reporter: Andrew >Assignee: John Roesler >Priority: Major > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)