[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832543#comment-16832543 ]
Andrew edited comment on KAFKA-8315 at 5/3/19 2:38 PM: ------------------------------------------------------- [~vvcephei] Ive been struggling with this today. * I have logging on every join, and I can see that joins do not occur before the 6 day period, except in a few cases (see below) * Ive checked the data and I can definitely see that the vast majority of records should have joins throughout the whole period. * I have added `windowstore.changelog.additional.retention.ms` so that the auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms of 840 hours. * I have removed my previous call to `until()` and only set the join window size to 2 days, and increased the join grace to 3 days. None of the above seems to make any difference. What I have observed is that I do get a few joins before the 6 day window for a single partition (13) and this partition is the first to complete by far, as it has the fewest records. Both topics are partitioned using murmur2 into 20 partitions (Ive checked we have the same keys in the corresponding partitions ofleft and right topics). We are running 4 instances of the streams application, and we do not explicitly set the `num.stream.threads`. My understanding is that a task is created for each partition (i.e. 20 tasks) and the work for these tasks is distributed out to the stream threads in our 4 streams applications. My assumption is that stream time is per-task (i.e. per partition). Is this correct? Is there _any_ possibility that the stream time of partition 13 is somehow shared with any of the other tasks, such that windows might be closed before the join-able data is read on the other partitions. Id like to understand some more about how stream-time increases. I imagine (probably naively) that within a task, stream time increases as the latest timestamp read from a partition, and that both left and right streams have their own stream time. I also assume that during a join, the left stream is read, up until just after the current right stream-time, then the right stream is read up until the latest left stream-time, so that data is pulled off both streams to minimize the difference in times between the latest records read off the topics. Is this near the mark? I will next try the join using a very large grace period to see if it makes a difference. One other thing I might try is to cap my end time using a stream filter to see if I manage to join to earlier records. was (Author: the4thamigo_uk): [~vvcephei] Ive been struggling with this today. * I have logging on every join, and I can see that joins do not occur before the 6 day period, except in a few cases (see below) * Ive checked the data and I can definitely see that the vast majority of records should have joins throughout the whole period. * I have added `windowstore.changelog.additional.retention.ms` so that the auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms of 840 hours. * I have removed my previous call to `until()` and only set the window size to 2 days, and increased the grace to 3 days. None of the above seems to make any difference. What I have observed is that I get a few joins before the 6 day window for a single partition and this partition is the first to complete as it has the fewest records. Both topics are partitioned using murmur2 into 20 partitions (Ive checked we have the same keys in the corresponding partitions ofleft and right topics). We are running 4 instances of the streams application, and we do not explicitly set the `num.stream.threads`. My understanding is that a task is created for each partition (i.e. 20 tasks) and the work for these tasks is distributed out to the stream threads in our 4 streams applications. My assumption is that stream time is per-task (i.e. per partition). Is this correct? Is there _any_ possibility that the stream time of partition 13 is somehow shared with any of the other tasks, such that windows might be closed before the join-able data is read on the other partitions. Id like to understand some more about how stream-time increases. I imagine (probably naively) that within a task, stream time increases as the latest timestamp read from a partition, and that both left and right streams have their own stream time. I also assume that during a join, the left stream is read, up until just after the current right stream-time, then the right stream is read up until the latest left stream-time, so that data is pulled off both streams to minimize the difference in times between the latest records read off the topics. Is this near the mark? I will next try the join using a very large grace period to see if it makes a difference. One other thing I might try is to cap my end time using a stream filter to see if I manage to join to earlier records. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > ----------------------------------------------------------------------------------------------------- > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Reporter: Andrew > Assignee: John Roesler > Priority: Major > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)