[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286808#comment-17286808
 ] 

Guozhang Wang commented on KAFKA-10847:
---------------------------------------

[~spena][~vcrfxia][~mjsax] I'd like to dump some more thoughts here regarding 
the design:

1) Regarding `fetchAll`: intuitively, even if we call this function on the 
newly added bookkeeping stores for those records who have not found a match 
yet, since the expiration frequency is high, the scan function should only 
return a few records each time. Within RocksDB, a range scan function is 
usually executed as 1) find the starting point based on the lower bound, 2) 
start scanning through the sst tables, stop at the higher bound. Among these 
two steps, the test spent finding the starting point should be roughly the same 
as the time spent for a single-key lookup, and since the calling frequency is 
high, the `low` and `high` bound on timestamp should be pretty close (maybe 
just a couple milliseconds away), which means step 2) should stop fairly 
quickly as well. We could see if this is indeed the case or not, e.g. when 
running the benchmark we first break down the latency to multiple stages: 1) 
time spent reading / writing in the window store, 2) time spent range-fetching 
in the expiration store, 3) deleting records in the expiration store, 4) join 
operator itself. And then within 2), we can plot a chart where the x-axis is 
the number of records returned, and y-axis is the e2e latency of getting the 
iterator and looping through it. If it shows that, even with say zero or one 
record returned, there's still a huge constant cost in latency, we may then 
consider 2) below.

2) If 1) shows that range-fetch in rocksDB has a constant amortized cost even 
with very few records returned, then instead of trying to expire records from 
the expiration store on each input record, we only try to fetch-and-delete from 
the expiration store periodically. We should then be careful about the period 
since if it is too infrequent, then we may introduce a much longer output 
emission latency, plus having the risk of blocking a thread from calling 
consumer.poll().

3) Another idea I have in mind is that, for the expiration store, instead of 
using two physical stores, one for each side, we can consider just adding a 
single store for both sides, with the key prefixed by the logical stream (e.g. 
a single bit prefix, "0" for left, and "1" for right). By doing that we can 
have one physical store less, and since the records from either side is 
clustered in the underlying layout, the range query should not be impacted much.

4) One more idea I have for the expiration store again: instead of using the 
sequence id to disable deduplication, we can consider removing that from the 
key to save 4 bytes per record, and instead serialize the value schema as a 
list<value>, assuming that in practice most should be a singleton list, which 
we can serialize with only very small byte overhead. The downside though is 
that instead of being able to do a blind write to append, we need to call a get 
and then a write, though the get should return null in most time. I'm hoping 
that with RocksDB's bloomfilter, such a get should be efficient and worth the 
tradeoff.

> Avoid spurious left/outer join results in stream-stream join 
> -------------------------------------------------------------
>
>                 Key: KAFKA-10847
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10847
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Sergio Peña
>            Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to