[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290161#comment-17290161
 ] 

Sergio Peña commented on KAFKA-10847:
-------------------------------------

[~guozhang] [~mjsax] [~vcrfxia] I think I found an efficient proposal for the 
final design. The throughput of left/outer joins are around 10% slower compared 
to the current left/outer joins behavior. Worth mentioning that these tests 
process topics that have 80% of joined records and that all keys in the topics 
were sequentially generated. Other scenarios could get different results, but 
this is sufficient for the POC.

The proposal still uses a secondary store to keep all non-joined records 
temporary.

The improvements I did are:
 * Use a time-ordered state store for efficient range queries. It got 50% 
faster results on outer joins.
 * Use one secondary state store shared between left/right joins. This uses a 
key combined of <timestamp, boolean, key, seq>. Boolean is True for the left 
join, and False for the right join. This combination helped me do range queries 
more efficient. In each side of the join, I walk through all records from a 
time-range. It returns all expired records from both sides and all keys. I made 
sure to reverse the record from the other side when emitting it.
 * I traverse the secondary store only from the last window that hasn't expired 
(startTime -> streamTime - windowSize). I then increment the startTime to the 
streamTime - windowSize. This ensures I only look for expired records and not 
delete the emitted ones.
 ** I still need to know how to get the grace period. For now, I emit the 
result if no joined is found. But adding a grace period to the equation 
shouldn't cause overhead (I suppose)
 * I do not delete any records when emitted. For this case I leverage the 
internal WindowStore deletion of expired segments. I save the put() in this 
case.
 * I still delete records from the secondary store when a join is found. But I 
added a check. I do a single-lookup in the store to check if the key is there, 
if not, then it continues; otherwise it calls the put(key, null) to delete it. 
I don't know if bloom filters are active and help me get faster results with 
single-lookups. This worked and increase my throughput for 10k in left joins.

 

What do you think about this proposal? I can start cleaning the code and write 
a PR if it is ok.

 

 

> Avoid spurious left/outer join results in stream-stream join 
> -------------------------------------------------------------
>
>                 Key: KAFKA-10847
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10847
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Sergio Peña
>            Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to