I think I have an understanding of how Kafka Streams is handling time
behind the scenes and would like someone to verify it for me.  The actual
reason is I am running into behavior where I only can join two streams for
a little, then it stops working.

Assuming a topology like this:

FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
countKStream.
FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
sumKStream.

countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
reduce() = avgKTable

Given that FEED is populated into kafka with the event time for the
timestamp (and just to make sure I have a TimeExtractor extracting the time
again), I believe time processing happens like this (ET = Event Time, PT =
Process Time):

FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
countKStream.
ET -> ET -> ET -> PT

FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
sumKStream.
ET -> ET -> ET -> PT

countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
reduce() = avgKTable
PT -> PT -> PT

Thus my join has really attempted to join records based on kafka's
processing time from the previous aggregations and not by event time like I
want.  When the streams start things seem to work well, avg topic/stores
populate.  After a few minutes count gets way ahead of sum and then avg
completely stops populating anything.  My hunch is that the processing time
gets outside that 1 minute join window and it no longer joins, increasing
the until to any number (tried 1 year) has no effect either.

Is this the correct way to calculate an average over a 1 minute event time
window with say a 14 day lag time (to load old data)?

Thank you all!

Reply via email to