[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290161#comment-17290161 ]
Sergio Peña commented on KAFKA-10847: ------------------------------------- [~guozhang] [~mjsax] [~vcrfxia] I think I found an efficient proposal for the final design. The throughput of left/outer joins are around 10% slower compared to the current left/outer joins behavior. Worth mentioning that these tests process topics that have 80% of joined records and that all keys in the topics were sequentially generated. Other scenarios could get different results, but this is sufficient for the POC. The proposal still uses a secondary store to keep all non-joined records temporary. The improvements I did are: * Use a time-ordered state store for efficient range queries. It got 50% faster results on outer joins. * Use one secondary state store shared between left/right joins. This uses a key combined of <timestamp, boolean, key, seq>. Boolean is True for the left join, and False for the right join. This combination helped me do range queries more efficient. In each side of the join, I walk through all records from a time-range. It returns all expired records from both sides and all keys. I made sure to reverse the record from the other side when emitting it. * I traverse the secondary store only from the last window that hasn't expired (startTime -> streamTime - windowSize). I then increment the startTime to the streamTime - windowSize. This ensures I only look for expired records and not delete the emitted ones. ** I still need to know how to get the grace period. For now, I emit the result if no joined is found. But adding a grace period to the equation shouldn't cause overhead (I suppose) * I do not delete any records when emitted. For this case I leverage the internal WindowStore deletion of expired segments. I save the put() in this case. * I still delete records from the secondary store when a join is found. But I added a check. I do a single-lookup in the store to check if the key is there, if not, then it continues; otherwise it calls the put(key, null) to delete it. I don't know if bloom filters are active and help me get faster results with single-lookups. This worked and increase my throughput for 10k in left joins. What do you think about this proposal? I can start cleaning the code and write a PR if it is ok. > Avoid spurious left/outer join results in stream-stream join > ------------------------------------------------------------- > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Sergio Peña > Priority: Major > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)