While it's not exactly the same as the window start/stop time you can store (in the state store) the earliest and latest timestamps of any messages in each window and use that as a good approximation for the window boundary times.
-hans > On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote: > > Yeah, windowing seems perfect, if only I could find out the current > window's start time (so I can log the current bucket's start & end times) > and process window messages individually rather than as aggregates. > > It doesn't seem like i can get this metadata from ProcessorContext though, > from looking over the javadocs > >> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> wrote: >> >> Ali, >> >> what you describe is (roughly!) how Kafka Streams implements the internal >> state stores to support windowing. >> >> Some users have been following a similar approach as you outlined, using >> the Processor API. >> >> >> >>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> wrote: >>> >>> It would be helpful to know the 'start' and 'end' of the current >> metadata, >>> so if an out of order message arrives late, and is being processed in >>> foreach(), you'd know which window / bucket it belongs to, and can handle >>> it accordingly. >>> >>> I'm guessing that's not possible at the moment. >>> >>> (My use case is, i receive a stream of messages. Messages need to be >> stored >>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a >> gap >>> of 30 mins or more since the last message (under a key), a new 'session' >>> (bucket) should be started, and future messages should belong to that >>> 'session', until the next 30+ min gap). >>> >>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io> >>> wrote: >>> >>>>> Can windows only be used for aggregations, or can they also be used >> for >>>> foreach(), >>>> and such? >>>> >>>> As of today, you can use windows only in aggregations. >>>> >>>>> And is it possible to get metadata on the message, such as whether or >>>> not its >>>> late, its index/position within the other messages, etc? >>>> >>>> If you use the Processor API of Kafka Streams, you can have access to >> an >>>> incoming record's topic, partition, offset, etc. via the so-called >>>> ProcessorContext (which is updated for every new incoming record): >>>> >>>> http://docs.confluent.io/current/streams/javadocs/org/ >>>> apache/kafka/streams/processor/Processor.html >>>> - You can get/store a reference to the ProcessorContext from >>>> `Processor#init()`. >>>> >>>> http://docs.confluent.io/current/streams/javadocs/org/ >>>> apache/kafka/streams/processor/ProcessorContext.html >>>> - The context can then be used within `Processor#process()` when you >>>> process a new record. As I said, the context is updated behind the >>> scenes >>>> to match the record that is currently being processed. >>>> >>>> >>>> Best, >>>> Michael >>>> >>>> >>>> >>>> >>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com> >>> wrote: >>>> >>>>> Can windows only be used for aggregations, or can they also be used >> for >>>>> foreach(), and such? >>>>> >>>>> And is it possible to get metadata on the message, such as whether or >>> not >>>>> its late, its index/position within the other messages, etc? >>>>> >>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io> >>>>> wrote: >>>>> >>>>>> And since you asked for a pointer, Ali: >>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing >>>>>> >>>>>> >>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll < >> mich...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Late-arriving and out-of-order data is only treated specially for >>>>>> windowed >>>>>>> aggregations. >>>>>>> >>>>>>> For stateless operations such as `KStream#foreach()` or >>>>> `KStream#map()`, >>>>>>> records are processed in the order they arrive (per partition). >>>>>>> >>>>>>> -Michael >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar < >> ali.rac...@gmail.com >>>> >>>>>> wrote: >>>>>>> >>>>>>>>> later when message A arrives it will put that message back >> into >>>>>>>>> the right temporal context and publish an amended result for >> the >>>>>> proper >>>>>>>>> time/session window as if message B were consumed in the >>> timestamp >>>>>> order >>>>>>>>> before message A. >>>>>>>> >>>>>>>> Does this apply to the aggregation Kafka stream methods then, >> and >>>> not >>>>> to >>>>>>>> e.g foreach? >>>>>>>> >>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen < >>> h...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Yes stream processing and CEP are subtlety different things. >>>>>>>>> >>>>>>>>> Kafka Streams helps you write stateful apps and allows that >>> state >>>> to >>>>>> be >>>>>>>>> preserved on disk (a local State store) as well as distributed >>> for >>>>> HA >>>>>> or >>>>>>>>> for parallel partitioned processing (via Kafka topic >> partitions >>>> and >>>>>>>>> consumer groups) as well as in memory (as a performance >>>>> enhancement). >>>>>>>>> >>>>>>>>> However a classical CEP engine with a pre-modeled state >> machine >>>> and >>>>>>>>> pattern matching rules is something different from stream >>>>> processing. >>>>>>>>> >>>>>>>>> It is on course possible to build a CEP system on top on Kafka >>>>> Streams >>>>>>>> and >>>>>>>>> get the best of both worlds. >>>>>>>>> >>>>>>>>> -hans >>>>>>>>> >>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan < >>>>>>>>> sabarish....@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> Hans >>>>>>>>>> >>>>>>>>>> What you state would work for aggregations, but not for >> state >>>>>> machines >>>>>>>>> and >>>>>>>>>> CEP. >>>>>>>>>> >>>>>>>>>> Regards >>>>>>>>>> Sab >>>>>>>>>> >>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" < >>> h...@confluent.io >>>>> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> The only way to make sure A is consumed first would be to >>> delay >>>>> the >>>>>>>>>>> consumption of message B for at least 15 minutes which >> would >>>> fly >>>>> in >>>>>>>> the >>>>>>>>>>> face of the principals of a true streaming platform so the >>>> short >>>>>>>> answer >>>>>>>>> to >>>>>>>>>>> your question is "no" because that would be batch >> processing >>>> not >>>>>>>> stream >>>>>>>>>>> processing. >>>>>>>>>>> >>>>>>>>>>> However, Kafka Streams does handle late arriving data. So >> if >>>> you >>>>>> had >>>>>>>>> some >>>>>>>>>>> analytics that computes results on a time window or a >> session >>>>>> window >>>>>>>>> then >>>>>>>>>>> Kafka streams will compute on the stream in real time >>>> (processing >>>>>>>>> message >>>>>>>>>>> B) and then later when message A arrives it will put that >>>> message >>>>>>>> back >>>>>>>>> into >>>>>>>>>>> the right temporal context and publish an amended result >> for >>>> the >>>>>>>> proper >>>>>>>>>>> time/session window as if message B were consumed in the >>>>> timestamp >>>>>>>> order >>>>>>>>>>> before message A. The end result of this flow is that you >>>>>> eventually >>>>>>>> get >>>>>>>>>>> the same results you would get in a batch processing system >>> but >>>>>> with >>>>>>>> the >>>>>>>>>>> added benefit of getting intermediary result at much lower >>>>> latency. >>>>>>>>>>> >>>>>>>>>>> -hans >>>>>>>>>>> >>>>>>>>>>> /** >>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent >> Inc. >>>>>>>>>>> * h...@confluent.io (650)924-2670 >>>>>>>>>>> */ >>>>>>>>>>> >>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar < >>>>>> ali.rac...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Is it possible to have Kafka Streams order messages >>> correctly >>>> by >>>>>>>> their >>>>>>>>>>>> timestamps, even if they arrived out of order? >>>>>>>>>>>> >>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and >> Message B >>>>> with >>>>>> a >>>>>>>>>>>> timestamp of 5:15 PM, are sent. >>>>>>>>>>>> >>>>>>>>>>>> Message B arrives sooner than Message A, due to network >>>> issues. >>>>>>>>>>>> >>>>>>>>>>>> Is it possible to make sure that, across all consumers of >>>> Kafka >>>>>>>> Streams >>>>>>>>>>>> (even if they are across different servers, but have the >>> same >>>>>>>> consumer >>>>>>>>>>>> group), Message A is consumed first, before Message B? >>>>>>>>>>>> >>>>>>>>>>>> Thanks. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>