Yes, and yes! -hans
> On Mar 21, 2017, at 7:45 AM, Ali Akhtar <ali.rac...@gmail.com> wrote: > > That would require > > - Knowing the current window's id (or some other identifier) to > differentiate it from other windows > > - Being able to process individual messages in a window > > Are those 2 things possible w/ kafka streams? (java) > > On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <h...@confluent.io> wrote: > >> While it's not exactly the same as the window start/stop time you can >> store (in the state store) the earliest and latest timestamps of any >> messages in each window and use that as a good approximation for the window >> boundary times. >> >> -hans >> >>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote: >>> >>> Yeah, windowing seems perfect, if only I could find out the current >>> window's start time (so I can log the current bucket's start & end times) >>> and process window messages individually rather than as aggregates. >>> >>> It doesn't seem like i can get this metadata from ProcessorContext >> though, >>> from looking over the javadocs >>> >>>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> >> wrote: >>>> >>>> Ali, >>>> >>>> what you describe is (roughly!) how Kafka Streams implements the >> internal >>>> state stores to support windowing. >>>> >>>> Some users have been following a similar approach as you outlined, using >>>> the Processor API. >>>> >>>> >>>> >>>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> >> wrote: >>>>> >>>>> It would be helpful to know the 'start' and 'end' of the current >>>> metadata, >>>>> so if an out of order message arrives late, and is being processed in >>>>> foreach(), you'd know which window / bucket it belongs to, and can >> handle >>>>> it accordingly. >>>>> >>>>> I'm guessing that's not possible at the moment. >>>>> >>>>> (My use case is, i receive a stream of messages. Messages need to be >>>> stored >>>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a >>>> gap >>>>> of 30 mins or more since the last message (under a key), a new >> 'session' >>>>> (bucket) should be started, and future messages should belong to that >>>>> 'session', until the next 30+ min gap). >>>>> >>>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io> >>>>> wrote: >>>>> >>>>>>> Can windows only be used for aggregations, or can they also be used >>>> for >>>>>> foreach(), >>>>>> and such? >>>>>> >>>>>> As of today, you can use windows only in aggregations. >>>>>> >>>>>>> And is it possible to get metadata on the message, such as whether or >>>>>> not its >>>>>> late, its index/position within the other messages, etc? >>>>>> >>>>>> If you use the Processor API of Kafka Streams, you can have access to >>>> an >>>>>> incoming record's topic, partition, offset, etc. via the so-called >>>>>> ProcessorContext (which is updated for every new incoming record): >>>>>> >>>>>> http://docs.confluent.io/current/streams/javadocs/org/ >>>>>> apache/kafka/streams/processor/Processor.html >>>>>> - You can get/store a reference to the ProcessorContext from >>>>>> `Processor#init()`. >>>>>> >>>>>> http://docs.confluent.io/current/streams/javadocs/org/ >>>>>> apache/kafka/streams/processor/ProcessorContext.html >>>>>> - The context can then be used within `Processor#process()` when you >>>>>> process a new record. As I said, the context is updated behind the >>>>> scenes >>>>>> to match the record that is currently being processed. >>>>>> >>>>>> >>>>>> Best, >>>>>> Michael >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com> >>>>> wrote: >>>>>> >>>>>>> Can windows only be used for aggregations, or can they also be used >>>> for >>>>>>> foreach(), and such? >>>>>>> >>>>>>> And is it possible to get metadata on the message, such as whether or >>>>> not >>>>>>> its late, its index/position within the other messages, etc? >>>>>>> >>>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> And since you asked for a pointer, Ali: >>>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll < >>>> mich...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Late-arriving and out-of-order data is only treated specially for >>>>>>>> windowed >>>>>>>>> aggregations. >>>>>>>>> >>>>>>>>> For stateless operations such as `KStream#foreach()` or >>>>>>> `KStream#map()`, >>>>>>>>> records are processed in the order they arrive (per partition). >>>>>>>>> >>>>>>>>> -Michael >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar < >>>> ali.rac...@gmail.com >>>>>> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>>> later when message A arrives it will put that message back >>>> into >>>>>>>>>>> the right temporal context and publish an amended result for >>>> the >>>>>>>> proper >>>>>>>>>>> time/session window as if message B were consumed in the >>>>> timestamp >>>>>>>> order >>>>>>>>>>> before message A. >>>>>>>>>> >>>>>>>>>> Does this apply to the aggregation Kafka stream methods then, >>>> and >>>>>> not >>>>>>> to >>>>>>>>>> e.g foreach? >>>>>>>>>> >>>>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen < >>>>> h...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Yes stream processing and CEP are subtlety different things. >>>>>>>>>>> >>>>>>>>>>> Kafka Streams helps you write stateful apps and allows that >>>>> state >>>>>> to >>>>>>>> be >>>>>>>>>>> preserved on disk (a local State store) as well as distributed >>>>> for >>>>>>> HA >>>>>>>> or >>>>>>>>>>> for parallel partitioned processing (via Kafka topic >>>> partitions >>>>>> and >>>>>>>>>>> consumer groups) as well as in memory (as a performance >>>>>>> enhancement). >>>>>>>>>>> >>>>>>>>>>> However a classical CEP engine with a pre-modeled state >>>> machine >>>>>> and >>>>>>>>>>> pattern matching rules is something different from stream >>>>>>> processing. >>>>>>>>>>> >>>>>>>>>>> It is on course possible to build a CEP system on top on Kafka >>>>>>> Streams >>>>>>>>>> and >>>>>>>>>>> get the best of both worlds. >>>>>>>>>>> >>>>>>>>>>> -hans >>>>>>>>>>> >>>>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan < >>>>>>>>>>> sabarish....@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hans >>>>>>>>>>>> >>>>>>>>>>>> What you state would work for aggregations, but not for >>>> state >>>>>>>> machines >>>>>>>>>>> and >>>>>>>>>>>> CEP. >>>>>>>>>>>> >>>>>>>>>>>> Regards >>>>>>>>>>>> Sab >>>>>>>>>>>> >>>>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" < >>>>> h...@confluent.io >>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> The only way to make sure A is consumed first would be to >>>>> delay >>>>>>> the >>>>>>>>>>>>> consumption of message B for at least 15 minutes which >>>> would >>>>>> fly >>>>>>> in >>>>>>>>>> the >>>>>>>>>>>>> face of the principals of a true streaming platform so the >>>>>> short >>>>>>>>>> answer >>>>>>>>>>> to >>>>>>>>>>>>> your question is "no" because that would be batch >>>> processing >>>>>> not >>>>>>>>>> stream >>>>>>>>>>>>> processing. >>>>>>>>>>>>> >>>>>>>>>>>>> However, Kafka Streams does handle late arriving data. So >>>> if >>>>>> you >>>>>>>> had >>>>>>>>>>> some >>>>>>>>>>>>> analytics that computes results on a time window or a >>>> session >>>>>>>> window >>>>>>>>>>> then >>>>>>>>>>>>> Kafka streams will compute on the stream in real time >>>>>> (processing >>>>>>>>>>> message >>>>>>>>>>>>> B) and then later when message A arrives it will put that >>>>>> message >>>>>>>>>> back >>>>>>>>>>> into >>>>>>>>>>>>> the right temporal context and publish an amended result >>>> for >>>>>> the >>>>>>>>>> proper >>>>>>>>>>>>> time/session window as if message B were consumed in the >>>>>>> timestamp >>>>>>>>>> order >>>>>>>>>>>>> before message A. The end result of this flow is that you >>>>>>>> eventually >>>>>>>>>> get >>>>>>>>>>>>> the same results you would get in a batch processing system >>>>> but >>>>>>>> with >>>>>>>>>> the >>>>>>>>>>>>> added benefit of getting intermediary result at much lower >>>>>>> latency. >>>>>>>>>>>>> >>>>>>>>>>>>> -hans >>>>>>>>>>>>> >>>>>>>>>>>>> /** >>>>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent >>>> Inc. >>>>>>>>>>>>> * h...@confluent.io (650)924-2670 >>>>>>>>>>>>> */ >>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar < >>>>>>>> ali.rac...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is it possible to have Kafka Streams order messages >>>>> correctly >>>>>> by >>>>>>>>>> their >>>>>>>>>>>>>> timestamps, even if they arrived out of order? >>>>>>>>>>>>>> >>>>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and >>>> Message B >>>>>>> with >>>>>>>> a >>>>>>>>>>>>>> timestamp of 5:15 PM, are sent. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Message B arrives sooner than Message A, due to network >>>>>> issues. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is it possible to make sure that, across all consumers of >>>>>> Kafka >>>>>>>>>> Streams >>>>>>>>>>>>>> (even if they are across different servers, but have the >>>>> same >>>>>>>>>> consumer >>>>>>>>>>>>>> group), Message A is consumed first, before Message B? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>