[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17271660#comment-17271660
 ] 

Sergio Peña edited comment on KAFKA-10847 at 1/28/21, 2:33 PM:
---------------------------------------------------------------

Hey [~mjsax] , here's a proposal to how implement this delay. Let me know what 
do you think.

*Implementation*

I'll use another persistent state store to temporary hold records that do not 
have a join. These records won't be processed until the window ends or a join 
is found. The joined records will be removed from the state store. If not join 
is found at the end of the record window, then it will be removed from the 
state store and forwarded it to the next topology node for further processing.

I'll use a punctuation to process the non-joined records at the end of the 
window. A punctuation will be created only when a new record arrives and no 
other punctuation hasn't been created yet. If a record arrives out-of-order 
(with a timestamp older than the current processing timestamp), then the 
current punctuation will be cancelled and a new punctuation will be created (or 
the record will be processed if arrives after the window has ended). The 
punctuation will use the WALL_CLOCK_TYPE so the time is advanced and the 
punctuation is called even when there are no records streamed. Once the 
punctuation is called, it will be cancelled. A new punctuation will be created 
only if there are other records which window haven't ended yet.

Performance of left/outer joins might be impacted, but I don't know for how 
much yet. The new state store will be called only once per record; either when 
a non-join record arrives (calling one put(key, value)), or when a join is 
detected (calling one put(key, null)). Being a persistent state store, this 
could cause some overhead writing to the disk. I don't want to use a memory 
state store to avoid consuming too much memory if several records are saved in 
the store. Another performance point is when a punctuation is called. This will 
walk through all the records which window has expired so they are processed. 
For each record processed, another put(key, null) is called to remove them from 
the store. If there are too many non-joined records during a window, then this 
punctuation will take sometime processing all those records, thus holding off 
the processing of new records of the next window. Could this cause re-balancing?


was (Author: spena):
Hey [~mjsax] , here's a proposal to how implement this delay. Let me know what 
do you think.

*Implementation*

I'll use another persistent state store to temporary hold records that do not 
have a join. These records won't be processed until the window ends or a join 
is found. The joined records will be removed from the state store. If not join 
is found at the end of the record window, then it will be removed from the 
state store and forwarded it to the next topology node for further processing.

I'll use a punctuation to process the non-joined records at the end of the 
window. A punctuation will be created only when a new record arrives and no 
other punctuation hasn't been created yet. If a record arrives late (with a 
timestamp older than the current processing timestamp), then the current 
punctuation will be cancelled and a new punctuation will be created (or the 
record will be processed if arrives after the window has ended). The 
punctuation will use the WALL_CLOCK_TYPE so the time is advanced and the 
punctuation is called even when there are no records streamed. Once the 
punctuation is called, it will be cancelled. A new punctuation will be created 
only if there are other records which window haven't ended yet.

Performance of left/outer joins might be impacted, but I don't know for how 
much yet. The new state store will be called only once per record; either when 
a non-join record arrives (calling one put(key, value)), or when a join is 
detected (calling one put(key, null)). Being a persistent state store, this 
could cause some overhead writing to the disk. I don't want to use a memory 
state store to avoid consuming too much memory if several records are saved in 
the store. Another performance point is when a punctuation is called. This will 
walk through all the records which window has expired so they are processed. 
For each record processed, another put(key, null) is called to remove them from 
the store. If there are too many non-joined records during a window, then this 
punctuation will take sometime processing all those records, thus holding off 
the processing of new records of the next window. Could this cause re-balancing?

> Avoid spurious left/outer join results in stream-stream join 
> -------------------------------------------------------------
>
>                 Key: KAFKA-10847
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10847
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Sergio Peña
>            Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to