While it's not exactly the same as the window start/stop time you can store (in 
the state store) the earliest and latest timestamps of any messages in each 
window and use that as a good approximation for the window boundary times.  

-hans

> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> 
> Yeah, windowing seems perfect, if only I could find out the current
> window's start time (so I can log the current bucket's start & end times)
> and process window messages individually rather than as aggregates.
> 
> It doesn't seem like i can get this metadata from ProcessorContext though,
> from looking over the javadocs
> 
>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> wrote:
>> 
>> Ali,
>> 
>> what you describe is (roughly!) how Kafka Streams implements the internal
>> state stores to support windowing.
>> 
>> Some users have been following a similar approach as you outlined, using
>> the Processor API.
>> 
>> 
>> 
>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>> 
>>> It would be helpful to know the 'start' and 'end' of the current
>> metadata,
>>> so if an out of order message arrives late, and is being processed in
>>> foreach(), you'd know which window / bucket it belongs to, and can handle
>>> it accordingly.
>>> 
>>> I'm guessing that's not possible at the moment.
>>> 
>>> (My use case is, i receive a stream of messages. Messages need to be
>> stored
>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
>> gap
>>> of 30 mins or more since the last message (under a key), a new 'session'
>>> (bucket) should be started, and future messages should belong to that
>>> 'session', until the next 30+ min gap).
>>> 
>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io>
>>> wrote:
>>> 
>>>>> Can windows only be used for aggregations, or can they also be used
>> for
>>>> foreach(),
>>>> and such?
>>>> 
>>>> As of today, you can use windows only in aggregations.
>>>> 
>>>>> And is it possible to get metadata on the message, such as whether or
>>>> not its
>>>> late, its index/position within the other messages, etc?
>>>> 
>>>> If you use the Processor API of Kafka Streams, you can have access to
>> an
>>>> incoming record's topic, partition, offset, etc. via the so-called
>>>> ProcessorContext (which is updated for every new incoming record):
>>>> 
>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>> apache/kafka/streams/processor/Processor.html
>>>> - You can get/store a reference to the ProcessorContext from
>>>> `Processor#init()`.
>>>> 
>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>> apache/kafka/streams/processor/ProcessorContext.html
>>>> - The context can then be used within `Processor#process()` when you
>>>> process a new record.  As I said, the context is updated behind the
>>> scenes
>>>> to match the record that is currently being processed.
>>>> 
>>>> 
>>>> Best,
>>>> Michael
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com>
>>> wrote:
>>>> 
>>>>> Can windows only be used for aggregations, or can they also be used
>> for
>>>>> foreach(), and such?
>>>>> 
>>>>> And is it possible to get metadata on the message, such as whether or
>>> not
>>>>> its late, its index/position within the other messages, etc?
>>>>> 
>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io>
>>>>> wrote:
>>>>> 
>>>>>> And since you asked for a pointer, Ali:
>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
>>>>>> 
>>>>>> 
>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
>> mich...@confluent.io>
>>>>>> wrote:
>>>>>> 
>>>>>>> Late-arriving and out-of-order data is only treated specially for
>>>>>> windowed
>>>>>>> aggregations.
>>>>>>> 
>>>>>>> For stateless operations such as `KStream#foreach()` or
>>>>> `KStream#map()`,
>>>>>>> records are processed in the order they arrive (per partition).
>>>>>>> 
>>>>>>> -Michael
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
>> ali.rac...@gmail.com
>>>> 
>>>>>> wrote:
>>>>>>> 
>>>>>>>>> later when message A arrives it will put that message back
>> into
>>>>>>>>> the right temporal context and publish an amended result for
>> the
>>>>>> proper
>>>>>>>>> time/session window as if message B were consumed in the
>>> timestamp
>>>>>> order
>>>>>>>>> before message A.
>>>>>>>> 
>>>>>>>> Does this apply to the aggregation Kafka stream methods then,
>> and
>>>> not
>>>>> to
>>>>>>>> e.g foreach?
>>>>>>>> 
>>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
>>> h...@confluent.io>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Yes stream processing and CEP are subtlety different things.
>>>>>>>>> 
>>>>>>>>> Kafka Streams helps you write stateful apps and allows that
>>> state
>>>> to
>>>>>> be
>>>>>>>>> preserved on disk (a local State store) as well as distributed
>>> for
>>>>> HA
>>>>>> or
>>>>>>>>> for parallel partitioned processing (via Kafka topic
>> partitions
>>>> and
>>>>>>>>> consumer groups) as well as in memory (as a performance
>>>>> enhancement).
>>>>>>>>> 
>>>>>>>>> However a classical CEP engine with a pre-modeled state
>> machine
>>>> and
>>>>>>>>> pattern matching rules is something different from stream
>>>>> processing.
>>>>>>>>> 
>>>>>>>>> It is on course possible to build a CEP system on top on Kafka
>>>>> Streams
>>>>>>>> and
>>>>>>>>> get the best of both worlds.
>>>>>>>>> 
>>>>>>>>> -hans
>>>>>>>>> 
>>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
>>>>>>>>> sabarish....@gmail.com> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hans
>>>>>>>>>> 
>>>>>>>>>> What you state would work for aggregations, but not for
>> state
>>>>>> machines
>>>>>>>>> and
>>>>>>>>>> CEP.
>>>>>>>>>> 
>>>>>>>>>> Regards
>>>>>>>>>> Sab
>>>>>>>>>> 
>>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
>>> h...@confluent.io
>>>>> 
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> The only way to make sure A is consumed first would be to
>>> delay
>>>>> the
>>>>>>>>>>> consumption of message B for at least 15 minutes which
>> would
>>>> fly
>>>>> in
>>>>>>>> the
>>>>>>>>>>> face of the principals of a true streaming platform so the
>>>> short
>>>>>>>> answer
>>>>>>>>> to
>>>>>>>>>>> your question is "no" because that would be batch
>> processing
>>>> not
>>>>>>>> stream
>>>>>>>>>>> processing.
>>>>>>>>>>> 
>>>>>>>>>>> However, Kafka Streams does handle late arriving data. So
>> if
>>>> you
>>>>>> had
>>>>>>>>> some
>>>>>>>>>>> analytics that computes results on a time window or a
>> session
>>>>>> window
>>>>>>>>> then
>>>>>>>>>>> Kafka streams will compute on the stream in real time
>>>> (processing
>>>>>>>>> message
>>>>>>>>>>> B) and then later when message A arrives it will put that
>>>> message
>>>>>>>> back
>>>>>>>>> into
>>>>>>>>>>> the right temporal context and publish an amended result
>> for
>>>> the
>>>>>>>> proper
>>>>>>>>>>> time/session window as if message B were consumed in the
>>>>> timestamp
>>>>>>>> order
>>>>>>>>>>> before message A. The end result of this flow is that you
>>>>>> eventually
>>>>>>>> get
>>>>>>>>>>> the same results you would get in a batch processing system
>>> but
>>>>>> with
>>>>>>>> the
>>>>>>>>>>> added benefit of getting intermediary result at much lower
>>>>> latency.
>>>>>>>>>>> 
>>>>>>>>>>> -hans
>>>>>>>>>>> 
>>>>>>>>>>> /**
>>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent
>> Inc.
>>>>>>>>>>> * h...@confluent.io (650)924-2670
>>>>>>>>>>> */
>>>>>>>>>>> 
>>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
>>>>>> ali.rac...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Is it possible to have Kafka Streams order messages
>>> correctly
>>>> by
>>>>>>>> their
>>>>>>>>>>>> timestamps, even if they arrived out of order?
>>>>>>>>>>>> 
>>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and
>> Message B
>>>>> with
>>>>>> a
>>>>>>>>>>>> timestamp of 5:15 PM, are sent.
>>>>>>>>>>>> 
>>>>>>>>>>>> Message B arrives sooner than Message A, due to network
>>>> issues.
>>>>>>>>>>>> 
>>>>>>>>>>>> Is it possible to make sure that, across all consumers of
>>>> Kafka
>>>>>>>> Streams
>>>>>>>>>>>> (even if they are across different servers, but have the
>>> same
>>>>>>>> consumer
>>>>>>>>>>>> group), Message A is consumed first, before Message B?
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to