Hi, I am not sure if I understand correctly: If you use default TimestampExtractor, the whole pipeline will be event-time based.
However, as you want to compute the AVG, I would recommend a different pattern anyway: FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable In aggregate, you compute both count and sum and emit <k,(cnt,sum)> records (ie, a custom data data for value) and in mapValue() you compute <k,avg>. Hope this helps. -Matthias On 5/4/17 7:36 PM, Garrett Barton wrote: > I think I have an understanding of how Kafka Streams is handling time > behind the scenes and would like someone to verify it for me. The actual > reason is I am running into behavior where I only can join two streams for > a little, then it stops working. > > Assuming a topology like this: > > FEED -> groupByKey() -> window(1 min) -> count() -> toStream() = > countKStream. > FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() = > sumKStream. > > countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) -> > reduce() = avgKTable > > Given that FEED is populated into kafka with the event time for the > timestamp (and just to make sure I have a TimeExtractor extracting the time > again), I believe time processing happens like this (ET = Event Time, PT = > Process Time): > > FEED -> groupByKey() -> window(1 min) -> count() -> toStream() = > countKStream. > ET -> ET -> ET -> PT > > FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() = > sumKStream. > ET -> ET -> ET -> PT > > countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) -> > reduce() = avgKTable > PT -> PT -> PT > > Thus my join has really attempted to join records based on kafka's > processing time from the previous aggregations and not by event time like I > want. When the streams start things seem to work well, avg topic/stores > populate. After a few minutes count gets way ahead of sum and then avg > completely stops populating anything. My hunch is that the processing time > gets outside that 1 minute join window and it no longer joins, increasing > the until to any number (tried 1 year) has no effect either. > > Is this the correct way to calculate an average over a 1 minute event time > window with say a 14 day lag time (to load old data)? > > Thank you all! >
signature.asc
Description: OpenPGP digital signature