[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839703#comment-16839703 ]
Andrew commented on KAFKA-8315: ------------------------------- I constructed a super-ugly workaround for my \{{join-example}} demo app. It adds a transformer onto each of the left and right streams, and they refer to each other's streamTime to decide whether to forward the messages. Not the 'right' solution, but I might be able to fix this up to serve as a workaround. Need to ensure that the right transformers pair up depending on their assigned partition, and maybe use a state store. Its a hack but it might just work for now. https://github.com/the4thamigo-uk/join-example/pull/1/files > Historical join issues > ---------------------- > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Andrew > Assignee: John Roesler > Priority: Major > Attachments: code.java > > > The problem we are experiencing is that we cannot reliably perform simple > joins over pre-populated kafka topics. This seems more apparent where one > topic has records at less frequent record timestamp intervals that the other. > An example of the issue is provided in this repository : > [https://github.com/the4thamigo-uk/join-example] > > The only way to increase the period of historically joined records is to > increase the grace period for the join windows, and this has repercussions > when you extend it to a large period e.g. 2 years of minute-by-minute records. > Related slack conversations : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900] > > Research on this issue has gone through a few phases : > 1) This issue was initially thought to be due to the inability to set the > retention period for a join window via {{Materialized: i.e.}} > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > This was considered to be a problem with the documentation not with the API > and is addressed in [https://github.com/apache/kafka/pull/6664] > 2) We then found an apparent issue in the code which would affect the > partition that is selected to deliver the next record to the join. This would > only be a problem for data that is out-of-order, and join-example uses data > that is in order of timestamp in both topics. So this fix is thought not to > affect join-example. > This was considered to be an issue and is being addressed in > [https://github.com/apache/kafka/pull/6719] > 3) Further investigation using a crafted unit test seems to show that the > partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok > [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b] > 4) the current assumption is that the issue is rooted in the way records are > consumed from the topics : > We have tried to set various options to suppress reads form the source topics > but it doesnt seem to make any difference : > [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)