About the join: Joins work perfectly fine if you apply them to "plain records" you read from a topic. When joining records, the records timestamp is used to compute the join result.
The "problem" in your case is that you apply the join to a windowed aggregation result. And thus, there is no "record timestamp" and Streams falls back to "stream time" to assign a timestamp to the window result record that can be used for the join. Glad it works with my suggested solution. :) -Matthias On 5/5/17 6:41 PM, Garrett Barton wrote: > Matthias, > That does make a lot of sense, so Streams never will create time its > always using a byproduct of a record time passed into it. Thus in theory > unless I force a change somewhere in a flow, the flow will stay as I start > it. > > The confusing part is around joins, since 'stream time' is kinda loosely > derived from where kafka streams thinks it is globally from consuming the > upstream topic, and this is where the timing can get out of sync. And it > did break my original flow after a few minutes every single time. That > part kind of makes me think that in a join the window and until likely > should be the same value, given that the streams could be off quite a bit. > But that is another topic. > > I redid my stream as you suggested and it worked wonderfully, shrunk the > flows considerably, and I can finally calculate averages consistently > longer than a few minutes. Thank you! > > On Fri, May 5, 2017 at 1:06 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> That part of time tracking is a little tricky. >> >> Streams internally maintains "stream time" -- this model the progress of >> your application over all input partitions and topics, and is based on >> the timestamps return by the timestamp extractor. Thus, if timestamp >> extractor returns even time, "stream time" will we event-time based, >> too. (Streams, never calls System.currentTimeMillis() so assign >> timestamps.) >> >> This internally tracked "stream time" is used in punctuate() (yes, low >> level API only) and for window operations to define the output record's >> timestamp. As "stream time" depends on record processing order, it might >> vary a little bit (the computation of it itself is deterministic, but it >> depends what records get fetched from the brokers, and the fetching step >> is not deterministic, making "global" processing order >> non-deterministic, too -- what is a general Kafka property: order is >> only guaranteed within a single partitions, but not across partitions). >> This little varying in "stream time" computation might break you join >> step in your original code... You would need to base the join on >> window-start time and not on event-time to get it right (and thus, you >> would not even need a windowed join). But the join is to "clumsy" anyway. >> >> Does this answer all your questions? >> >> (We don't document those details on purpose, because it's an internal >> design and we want the flexibility to change this if required -- thus, >> you should also not rely on "stream time" advance assumptions in your >> code.) >> >> >> -Matthias >> >> >> On 5/5/17 8:09 AM, Garrett Barton wrote: >>> That does actually, I never thought about a custom value object to hold >> the >>> Count/Sum variables. Thank you! >>> >>> For the time semantics here is where I got hung up, copied from kafka >>> streams documentation: >>> >>> Finally, whenever a Kafka Streams application writes records to Kafka, >> then >>> it will also assign timestamps to these new records. The way the >> timestamps >>> are assigned depends on the context: >>> >>> - When new output records are generated via processing some input >>> record, for example, context.forward() triggered in the process() >>> function call, output record timestamps are inherited from input >> record >>> timestamps directly. >>> - *Given I set things to Event Time, this would output Event Time >>> correct?* >>> - When new output records are generated via periodic functions such >>> as punctuate(), the output record timestamp is defined as the current >>> internal time (obtained through context.timestamp()) of the stream >> task. >>> - *This is where I am confused, what operations count as a >>> punctuate()? Just the low level api? And are these thus Process >> time?* >>> - For aggregations, the timestamp of a resulting aggregate update >>> record will be that of the latest arrived input record that triggered >> the >>> update. >>> - *This sounds like last used Event Time, correct?* >>> >>> >>> On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Hi, >>>> >>>> I am not sure if I understand correctly: If you use default >>>> TimestampExtractor, the whole pipeline will be event-time based. >>>> >>>> However, as you want to compute the AVG, I would recommend a different >>>> pattern anyway: >>>> >>>> FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = >> avgKTable >>>> >>>> In aggregate, you compute both count and sum and emit <k,(cnt,sum)> >>>> records (ie, a custom data data for value) and in mapValue() you compute >>>> <k,avg>. >>>> >>>> Hope this helps. >>>> >>>> -Matthias >>>> >>>> On 5/4/17 7:36 PM, Garrett Barton wrote: >>>>> I think I have an understanding of how Kafka Streams is handling time >>>>> behind the scenes and would like someone to verify it for me. The >> actual >>>>> reason is I am running into behavior where I only can join two streams >>>> for >>>>> a little, then it stops working. >>>>> >>>>> Assuming a topology like this: >>>>> >>>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() = >>>>> countKStream. >>>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() = >>>>> sumKStream. >>>>> >>>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) -> >>>>> reduce() = avgKTable >>>>> >>>>> Given that FEED is populated into kafka with the event time for the >>>>> timestamp (and just to make sure I have a TimeExtractor extracting the >>>> time >>>>> again), I believe time processing happens like this (ET = Event Time, >> PT >>>> = >>>>> Process Time): >>>>> >>>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() = >>>>> countKStream. >>>>> ET -> ET -> ET -> PT >>>>> >>>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() = >>>>> sumKStream. >>>>> ET -> ET -> ET -> PT >>>>> >>>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) -> >>>>> reduce() = avgKTable >>>>> PT -> PT -> PT >>>>> >>>>> Thus my join has really attempted to join records based on kafka's >>>>> processing time from the previous aggregations and not by event time >>>> like I >>>>> want. When the streams start things seem to work well, avg >> topic/stores >>>>> populate. After a few minutes count gets way ahead of sum and then avg >>>>> completely stops populating anything. My hunch is that the processing >>>> time >>>>> gets outside that 1 minute join window and it no longer joins, >> increasing >>>>> the until to any number (tried 1 year) has no effect either. >>>>> >>>>> Is this the correct way to calculate an average over a 1 minute event >>>> time >>>>> window with say a 14 day lag time (to load old data)? >>>>> >>>>> Thank you all! >>>>> >>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature