Yes, and yes!

-hans



> On Mar 21, 2017, at 7:45 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> 
> That would require
> 
> - Knowing the current window's id (or some other identifier) to
> differentiate it from other windows
> 
> - Being able to process individual messages in a window
> 
> Are those 2 things possible w/ kafka streams? (java)
> 
> On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <h...@confluent.io> wrote:
> 
>> While it's not exactly the same as the window start/stop time you can
>> store (in the state store) the earliest and latest timestamps of any
>> messages in each window and use that as a good approximation for the window
>> boundary times.
>> 
>> -hans
>> 
>>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>> 
>>> Yeah, windowing seems perfect, if only I could find out the current
>>> window's start time (so I can log the current bucket's start & end times)
>>> and process window messages individually rather than as aggregates.
>>> 
>>> It doesn't seem like i can get this metadata from ProcessorContext
>> though,
>>> from looking over the javadocs
>>> 
>>>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io>
>> wrote:
>>>> 
>>>> Ali,
>>>> 
>>>> what you describe is (roughly!) how Kafka Streams implements the
>> internal
>>>> state stores to support windowing.
>>>> 
>>>> Some users have been following a similar approach as you outlined, using
>>>> the Processor API.
>>>> 
>>>> 
>>>> 
>>>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com>
>> wrote:
>>>>> 
>>>>> It would be helpful to know the 'start' and 'end' of the current
>>>> metadata,
>>>>> so if an out of order message arrives late, and is being processed in
>>>>> foreach(), you'd know which window / bucket it belongs to, and can
>> handle
>>>>> it accordingly.
>>>>> 
>>>>> I'm guessing that's not possible at the moment.
>>>>> 
>>>>> (My use case is, i receive a stream of messages. Messages need to be
>>>> stored
>>>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
>>>> gap
>>>>> of 30 mins or more since the last message (under a key), a new
>> 'session'
>>>>> (bucket) should be started, and future messages should belong to that
>>>>> 'session', until the next 30+ min gap).
>>>>> 
>>>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io>
>>>>> wrote:
>>>>> 
>>>>>>> Can windows only be used for aggregations, or can they also be used
>>>> for
>>>>>> foreach(),
>>>>>> and such?
>>>>>> 
>>>>>> As of today, you can use windows only in aggregations.
>>>>>> 
>>>>>>> And is it possible to get metadata on the message, such as whether or
>>>>>> not its
>>>>>> late, its index/position within the other messages, etc?
>>>>>> 
>>>>>> If you use the Processor API of Kafka Streams, you can have access to
>>>> an
>>>>>> incoming record's topic, partition, offset, etc. via the so-called
>>>>>> ProcessorContext (which is updated for every new incoming record):
>>>>>> 
>>>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>>>> apache/kafka/streams/processor/Processor.html
>>>>>> - You can get/store a reference to the ProcessorContext from
>>>>>> `Processor#init()`.
>>>>>> 
>>>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>>>> apache/kafka/streams/processor/ProcessorContext.html
>>>>>> - The context can then be used within `Processor#process()` when you
>>>>>> process a new record.  As I said, the context is updated behind the
>>>>> scenes
>>>>>> to match the record that is currently being processed.
>>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> Michael
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> Can windows only be used for aggregations, or can they also be used
>>>> for
>>>>>>> foreach(), and such?
>>>>>>> 
>>>>>>> And is it possible to get metadata on the message, such as whether or
>>>>> not
>>>>>>> its late, its index/position within the other messages, etc?
>>>>>>> 
>>>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> And since you asked for a pointer, Ali:
>>>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
>>>> mich...@confluent.io>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Late-arriving and out-of-order data is only treated specially for
>>>>>>>> windowed
>>>>>>>>> aggregations.
>>>>>>>>> 
>>>>>>>>> For stateless operations such as `KStream#foreach()` or
>>>>>>> `KStream#map()`,
>>>>>>>>> records are processed in the order they arrive (per partition).
>>>>>>>>> 
>>>>>>>>> -Michael
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
>>>> ali.rac...@gmail.com
>>>>>> 
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>>> later when message A arrives it will put that message back
>>>> into
>>>>>>>>>>> the right temporal context and publish an amended result for
>>>> the
>>>>>>>> proper
>>>>>>>>>>> time/session window as if message B were consumed in the
>>>>> timestamp
>>>>>>>> order
>>>>>>>>>>> before message A.
>>>>>>>>>> 
>>>>>>>>>> Does this apply to the aggregation Kafka stream methods then,
>>>> and
>>>>>> not
>>>>>>> to
>>>>>>>>>> e.g foreach?
>>>>>>>>>> 
>>>>>>>>>> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
>>>>> h...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Yes stream processing and CEP are subtlety different things.
>>>>>>>>>>> 
>>>>>>>>>>> Kafka Streams helps you write stateful apps and allows that
>>>>> state
>>>>>> to
>>>>>>>> be
>>>>>>>>>>> preserved on disk (a local State store) as well as distributed
>>>>> for
>>>>>>> HA
>>>>>>>> or
>>>>>>>>>>> for parallel partitioned processing (via Kafka topic
>>>> partitions
>>>>>> and
>>>>>>>>>>> consumer groups) as well as in memory (as a performance
>>>>>>> enhancement).
>>>>>>>>>>> 
>>>>>>>>>>> However a classical CEP engine with a pre-modeled state
>>>> machine
>>>>>> and
>>>>>>>>>>> pattern matching rules is something different from stream
>>>>>>> processing.
>>>>>>>>>>> 
>>>>>>>>>>> It is on course possible to build a CEP system on top on Kafka
>>>>>>> Streams
>>>>>>>>>> and
>>>>>>>>>>> get the best of both worlds.
>>>>>>>>>>> 
>>>>>>>>>>> -hans
>>>>>>>>>>> 
>>>>>>>>>>>> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
>>>>>>>>>>> sabarish....@gmail.com> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hans
>>>>>>>>>>>> 
>>>>>>>>>>>> What you state would work for aggregations, but not for
>>>> state
>>>>>>>> machines
>>>>>>>>>>> and
>>>>>>>>>>>> CEP.
>>>>>>>>>>>> 
>>>>>>>>>>>> Regards
>>>>>>>>>>>> Sab
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
>>>>> h...@confluent.io
>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The only way to make sure A is consumed first would be to
>>>>> delay
>>>>>>> the
>>>>>>>>>>>>> consumption of message B for at least 15 minutes which
>>>> would
>>>>>> fly
>>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>>>> face of the principals of a true streaming platform so the
>>>>>> short
>>>>>>>>>> answer
>>>>>>>>>>> to
>>>>>>>>>>>>> your question is "no" because that would be batch
>>>> processing
>>>>>> not
>>>>>>>>>> stream
>>>>>>>>>>>>> processing.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> However, Kafka Streams does handle late arriving data. So
>>>> if
>>>>>> you
>>>>>>>> had
>>>>>>>>>>> some
>>>>>>>>>>>>> analytics that computes results on a time window or a
>>>> session
>>>>>>>> window
>>>>>>>>>>> then
>>>>>>>>>>>>> Kafka streams will compute on the stream in real time
>>>>>> (processing
>>>>>>>>>>> message
>>>>>>>>>>>>> B) and then later when message A arrives it will put that
>>>>>> message
>>>>>>>>>> back
>>>>>>>>>>> into
>>>>>>>>>>>>> the right temporal context and publish an amended result
>>>> for
>>>>>> the
>>>>>>>>>> proper
>>>>>>>>>>>>> time/session window as if message B were consumed in the
>>>>>>> timestamp
>>>>>>>>>> order
>>>>>>>>>>>>> before message A. The end result of this flow is that you
>>>>>>>> eventually
>>>>>>>>>> get
>>>>>>>>>>>>> the same results you would get in a batch processing system
>>>>> but
>>>>>>>> with
>>>>>>>>>> the
>>>>>>>>>>>>> added benefit of getting intermediary result at much lower
>>>>>>> latency.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> -hans
>>>>>>>>>>>>> 
>>>>>>>>>>>>> /**
>>>>>>>>>>>>> * Hans Jespersen, Principal Systems Engineer, Confluent
>>>> Inc.
>>>>>>>>>>>>> * h...@confluent.io (650)924-2670
>>>>>>>>>>>>> */
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
>>>>>>>> ali.rac...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Is it possible to have Kafka Streams order messages
>>>>> correctly
>>>>>> by
>>>>>>>>>> their
>>>>>>>>>>>>>> timestamps, even if they arrived out of order?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> E.g, say Message A with a timestamp of 5:00 PM and
>>>> Message B
>>>>>>> with
>>>>>>>> a
>>>>>>>>>>>>>> timestamp of 5:15 PM, are sent.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Message B arrives sooner than Message A, due to network
>>>>>> issues.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Is it possible to make sure that, across all consumers of
>>>>>> Kafka
>>>>>>>>>> Streams
>>>>>>>>>>>>>> (even if they are across different servers, but have the
>>>>> same
>>>>>>>>>> consumer
>>>>>>>>>>>>>> group), Message A is consumed first, before Message B?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 

Reply via email to