[
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835832#comment-16835832
]
John Roesler commented on KAFKA-8315:
-------------------------------------
Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your
investigation is progressing.
Last question first: Sterams should choose to consume from the left or right
based on which one has the lower timestamp in the next record, so I would not
expect one side to "run ahead" of the other. There's one caveat, that when one
side is being produced more slowly, Streams won't just wait indefinitely for
the next data, but instead just process the side that does have data. This is
controled by the "max idle ms" config, but since you're processing
historically, this shouldn't be your problem. Still might be worth a look.
Maybe for debugging purposes, you can print out the key, value, and timestamp
for each of the sides as well as in the joiner, so you can identify which side
is triggering the join, and evaluate whether or not it's correctly time-ordered.
If it is in fact running ahead on one side, despite what it should be doing,
this would explain why you see better results with a larger grace period. To
confirm, the grace period should only matter up to the maximum time skew in
your stream. So, as you said, if you have two producers that each produce a
full 24 hours of data, sequentially, then you should see stream time advance
when the first producer writes its data, and then "freeze" while the second
producer writes its (out-of-order) data. Thus, you'll want to set the grace
period to keep old windows around for at least 24 hours, since you know you
have to wait for that second producer's data.
Finally, to answer your earlier questions, yes, each task is handling just one
partition of both input topics (the same partition on the left and right).
Stream Time is independently maintained for each task/partition, and it is
computed simply as the highest timestamp yet observed for that partition. If
you want to look at it in detail, it's tracked in
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore .
Actually, you can set that class's logger to DEBUG mode and it'll print out
every time it skips a record that is outside of retention.
Minor point, you should not need to mess with the retention of the changelog
topic. Streams sets this appropriately to preserve the same data as the store,
but this is only apparent when restoring the store. The actual results of the
join are served out of the state store, so only the state store's retention
matters. This is what you're setting with the grace period.
I hope this helps!
-John
> Cannot pass Materialized into a join operation - hence cant set retention
> period independent of grace
> -----------------------------------------------------------------------------------------------------
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Andrew
> Assignee: John Roesler
> Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()`
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
> but there is no where to pass a `Materialized` instance to the join
> operation, only to the group operation is supported it seems.
>
> Slack conversation here :
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the
> grace period, so I think this is more than a documentation fix (see comments
> below)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)