[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17286808#comment-17286808 ]
Guozhang Wang commented on KAFKA-10847: --------------------------------------- [~spena][~vcrfxia][~mjsax] I'd like to dump some more thoughts here regarding the design: 1) Regarding `fetchAll`: intuitively, even if we call this function on the newly added bookkeeping stores for those records who have not found a match yet, since the expiration frequency is high, the scan function should only return a few records each time. Within RocksDB, a range scan function is usually executed as 1) find the starting point based on the lower bound, 2) start scanning through the sst tables, stop at the higher bound. Among these two steps, the test spent finding the starting point should be roughly the same as the time spent for a single-key lookup, and since the calling frequency is high, the `low` and `high` bound on timestamp should be pretty close (maybe just a couple milliseconds away), which means step 2) should stop fairly quickly as well. We could see if this is indeed the case or not, e.g. when running the benchmark we first break down the latency to multiple stages: 1) time spent reading / writing in the window store, 2) time spent range-fetching in the expiration store, 3) deleting records in the expiration store, 4) join operator itself. And then within 2), we can plot a chart where the x-axis is the number of records returned, and y-axis is the e2e latency of getting the iterator and looping through it. If it shows that, even with say zero or one record returned, there's still a huge constant cost in latency, we may then consider 2) below. 2) If 1) shows that range-fetch in rocksDB has a constant amortized cost even with very few records returned, then instead of trying to expire records from the expiration store on each input record, we only try to fetch-and-delete from the expiration store periodically. We should then be careful about the period since if it is too infrequent, then we may introduce a much longer output emission latency, plus having the risk of blocking a thread from calling consumer.poll(). 3) Another idea I have in mind is that, for the expiration store, instead of using two physical stores, one for each side, we can consider just adding a single store for both sides, with the key prefixed by the logical stream (e.g. a single bit prefix, "0" for left, and "1" for right). By doing that we can have one physical store less, and since the records from either side is clustered in the underlying layout, the range query should not be impacted much. 4) One more idea I have for the expiration store again: instead of using the sequence id to disable deduplication, we can consider removing that from the key to save 4 bytes per record, and instead serialize the value schema as a list<value>, assuming that in practice most should be a singleton list, which we can serialize with only very small byte overhead. The downside though is that instead of being able to do a blind write to append, we need to call a get and then a write, though the get should return null in most time. I'm hoping that with RocksDB's bloomfilter, such a get should be efficient and worth the tradeoff. > Avoid spurious left/outer join results in stream-stream join > ------------------------------------------------------------- > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Sergio Peña > Priority: Major > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)