That does actually, I never thought about a custom value object to hold the Count/Sum variables. Thank you!
For the time semantics here is where I got hung up, copied from kafka streams documentation: Finally, whenever a Kafka Streams application writes records to Kafka, then it will also assign timestamps to these new records. The way the timestamps are assigned depends on the context: - When new output records are generated via processing some input record, for example, context.forward() triggered in the process() function call, output record timestamps are inherited from input record timestamps directly. - *Given I set things to Event Time, this would output Event Time correct?* - When new output records are generated via periodic functions such as punctuate(), the output record timestamp is defined as the current internal time (obtained through context.timestamp()) of the stream task. - *This is where I am confused, what operations count as a punctuate()? Just the low level api? And are these thus Process time?* - For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update. - *This sounds like last used Event Time, correct?* On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Hi, > > I am not sure if I understand correctly: If you use default > TimestampExtractor, the whole pipeline will be event-time based. > > However, as you want to compute the AVG, I would recommend a different > pattern anyway: > > FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable > > In aggregate, you compute both count and sum and emit <k,(cnt,sum)> > records (ie, a custom data data for value) and in mapValue() you compute > <k,avg>. > > Hope this helps. > > -Matthias > > On 5/4/17 7:36 PM, Garrett Barton wrote: > > I think I have an understanding of how Kafka Streams is handling time > > behind the scenes and would like someone to verify it for me. The actual > > reason is I am running into behavior where I only can join two streams > for > > a little, then it stops working. > > > > Assuming a topology like this: > > > > FEED -> groupByKey() -> window(1 min) -> count() -> toStream() = > > countKStream. > > FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() = > > sumKStream. > > > > countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) -> > > reduce() = avgKTable > > > > Given that FEED is populated into kafka with the event time for the > > timestamp (and just to make sure I have a TimeExtractor extracting the > time > > again), I believe time processing happens like this (ET = Event Time, PT > = > > Process Time): > > > > FEED -> groupByKey() -> window(1 min) -> count() -> toStream() = > > countKStream. > > ET -> ET -> ET -> PT > > > > FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() = > > sumKStream. > > ET -> ET -> ET -> PT > > > > countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) -> > > reduce() = avgKTable > > PT -> PT -> PT > > > > Thus my join has really attempted to join records based on kafka's > > processing time from the previous aggregations and not by event time > like I > > want. When the streams start things seem to work well, avg topic/stores > > populate. After a few minutes count gets way ahead of sum and then avg > > completely stops populating anything. My hunch is that the processing > time > > gets outside that 1 minute join window and it no longer joins, increasing > > the until to any number (tried 1 year) has no effect either. > > > > Is this the correct way to calculate an average over a 1 minute event > time > > window with say a 14 day lag time (to load old data)? > > > > Thank you all! > > > >