[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16832543#comment-16832543
 ] 

Andrew edited comment on KAFKA-8315 at 5/3/19 2:38 PM:
-------------------------------------------------------

[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions of the left and right 
topics). We are running 4 instances of the streams application, and we do not 
explicitly set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams applications. My assumption is that stream time is per-task (i.e. per 
partition). Is this correct? Is there _any_ possibility that the stream time of 
partition 13 is somehow shared with any of the other tasks, such that windows 
might be closed before the join-able data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap my end time using a stream 
filter to see if I manage to join to earlier records.


was (Author: the4thamigo_uk):
[~vvcephei] Ive been struggling with this today.
 * I have logging on every join, and I can see that joins do not occur before 
the 6 day period, except in a few cases (see below)
 * Ive checked the data and I can definitely see that the vast majority of 
records should have joins throughout the whole period.
 * I have added `windowstore.changelog.additional.retention.ms` so that the 
auto-generated JOINTHIS/JOINOTHER intermediate topics now have a retention.ms 
of 840 hours.
 * I have removed my previous call to `until()` and only set the join window 
size to 2 days, and increased the join grace to 3 days.

None of the above seems to make any difference.

What I have observed is that I do get a few joins before the 6 day window for a 
single partition (13) and this partition is the first to complete by far, as it 
has the fewest records.

Both topics are partitioned using murmur2 into 20 partitions (Ive checked we 
have the same keys in the corresponding partitions ofleft and right topics). We 
are running 4 instances of the streams application, and we do not explicitly 
set the `num.stream.threads`.

My understanding is that a task is created for each partition (i.e. 20 tasks) 
and the work for these tasks is distributed out to the stream threads in our 4 
streams applications. My assumption is that stream time is per-task (i.e. per 
partition). Is this correct? Is there _any_ possibility that the stream time of 
partition 13 is somehow shared with any of the other tasks, such that windows 
might be closed before the join-able data is read on the other partitions.

Id like to understand some more about how stream-time increases. I imagine 
(probably naively) that within a task, stream time increases as the latest 
timestamp read from a partition, and that both left and right streams have 
their own stream time. I also assume that during a join, the left stream is 
read, up until just after the current right stream-time, then the right stream 
is read up until the latest left stream-time, so that data is pulled off both 
streams to minimize the difference in times between the latest records read off 
the topics. Is this near the mark?

I will next try the join using a very large grace period to see if it makes a 
difference. One other thing I might try is to cap my end time using a stream 
filter to see if I manage to join to earlier records.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to