Yes, basically I'm ok with how join works including window and retention periods, under normal circumstances. In real time of occurrence of events, application joining streams will get something like this:

T1 + 0 => topic_small (K1, V1)  => join result (None)
T1 + 1 min =>  topic_large (K1, VT1) => join result (K1, V1, VT1)
T1 + 3 mins => topic_large (K1, VT2) => join result (K1, V1, VT2)
T1 + 7 mins => topic_small (K1, V2) => join result (K1, V2, VT2)

According to Windowed<K> and WindowedSerializer it keeps only start of window with key when storing it to state store. Assuming that window start time same for both topics/KStreams (not sure yet, still reading source), but even if not same, state stores actions of Kafka Streams will be like this:

join_left_side_store.put ( K1-W1, V1 )
join_right_side_store.put ( K1-W1, VT1 )
join_left_side_store.put ( K1-W1, V2 )
join_right_side_store.put ( K1-W1, VT2 )

However when consuming same topics by the same application from beginning from scratch (no application local state stores) for large period of time (greater than window period, but less than retention period), join result for 10 minutes window will be different, like this:

join result (None)
join result (K1, V2, VT1)
join result (K1, V2, VT2)

Because topic_large's stream is being read slower, value of topic_small in window will change from V1 to V2, before Kafka Streams will receive VT1.

I.e. state stores actions of Kafka Streams will be like this:

join_left_side_store.put ( K1-W1, V1 )
join_left_side_store.put ( K1-W1, V2 )
join_right_side_store.put ( K1-W1, VT1 )
join_right_side_store.put ( K1-W1, VT2 )

Isn't it?

On Wed, Apr 26, 2017 at 6:50 PM, Damian Guy <damian....@gmail.com> wrote:
Hi Murad,

On Wed, 26 Apr 2017 at 13:37 Murad Mamedov <m...@muradm.net> wrote:

Is there any global time synchronization between streams in Kafka Streams API? So that, it would not consume more events from one stream while the other is still behind in time. Or probably better to rephrase it like, is
 there global event ordering based on timestamp of event?


Yes. When streams are joined each partition from the joined streams are grouped together into a single Task. Each Task maintains a record buffer
for all of the topics it is consuming from. When it is time process a
record it will chose a record from the partition that has the smallest
timestamp. So in this way it makes a best effort to keep the streams in
sync.



The other thing could be to join streams in window, however same question arises, if one stream days behind the other, will the join window of 15
 minutes ever work?


If the data is arriving much later you can use
JoinWindows.until(SOME_TIME_PERIOD) to keep the data around. In this case the streams will still join. Once SOME_TIME_PERIOD has expired the streams
will no longer be able to join.



I'm trying to grasp a way on how to design replay of long periods of time for application with multiple topics/streams. Especially when combining with low-level API processors and transformers which relay on each other via GlobalKTable or KTable stores on these streams. For instance, smaller
 topic could have the following sequence of events:

 T1 - (k1, v1)
 T1 + 10 minutes - (k1, null)
 T1 + 20 minutes - (k1, v2)

 While topic with larger events:

 T1 - (k1, vt1)
 T1 + 5 minutes - (k1, null)
 T1 + 15 minutes - (k1, vt2)

If one would join or lookup these streams in realtime (timestamp of event
 is approximately = wall clock time) result would be:

 T1 - topic_small (k1, v1) - topic_large (k1, vt1)
 T1 + 5 minutes - topic_small (k1, v1) - topic_large (k1, null)
 T1 + 10 minutes - topic_small (k1, null) - topic_large (k1, null)
 T1 + 15 minutes - topic_small (k1, null) - topic_large (k1, vt2)
 T1 + 20 minutes - topic_small (k1, v2) - topic_large (k1, vt2)

However, when replaying streams from beginning, from perspective of topic
 with large events, it would see topic with small events as (k1, v2),
 completely missing v1 and null states in case of GlobalKTable/KTable
 presentation or events in case of KStream-KStream windowed join.


I don't really follow here. In the case of a GlobalKTable it will be
initialized with all of the existing data before the rest of the streams
start processing.


Do I miss something here? Should application be responsible in global synchronization between topics, or Kafka Streams does / can do that? If
 application should, then what could be approach to solve it?

 I hope I could explain myself.

 Thanks in advance

Reply via email to