[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17271660#comment-17271660 ]
Sergio Peña commented on KAFKA-10847: ------------------------------------- Hey [~mjsax] , here's a proposal to how implement this delay. Let me know what do you think. *Implementation* I'll use another persistent state store to temporary hold records that do not have a join. These records won't be processed until the window ends or a join is found. The joined records will be removed from the state store. If not join is found at the end of the record window, then it will be removed from the state store and forwarded it to the next topology node for further processing. I'll use a punctuation to process the non-joined records at the end of the window. A punctuation will be created only when a new record arrives and no other punctuation hasn't been created yet. If a record arrives late (with a timestamp older than the current processing timestamp), then the current punctuation will be cancelled and a new punctuation will be created (or the record will be processed if arrives after the window has ended). The punctuation will use the WALL_CLOCK_TYPE so the time is advanced and the punctuation is called even when there are no records streamed. Once the punctuation is called, it will be cancelled. A new punctuation will be created only if there are other records which window haven't ended yet. Performance of left/outer joins might be impacted, but I don't know for how much yet. The new state store will be called only once per record; either when a non-join record arrives (calling one put(key, value)), or when a join is detected (calling one put(key, null)). Being a persistent state store, this could cause some overhead writing to the disk. I don't want to use a memory state store to avoid consuming too much memory if several records are saved in the store. Another performance point is when a punctuation is called. This will walk through all the records which window has expired so they are processed. For each record processed, another put(key, null) is called to remove them from the store. If there are too many non-joined records during a window, then this punctuation will take sometime processing all those records, thus holding off the processing of new records of the next window. Could this cause re-balancing? > Avoid spurious left/outer join results in stream-stream join > ------------------------------------------------------------- > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Sergio Peña > Priority: Major > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)