Yeah, windowing seems perfect, if only I could find out the current
window's start time (so I can log the current bucket's start & end times)
and process window messages individually rather than as aggregates.

It doesn't seem like i can get this metadata from ProcessorContext though,
from looking over the javadocs

On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> wrote:

> Ali,
>
> what you describe is (roughly!) how Kafka Streams implements the internal
> state stores to support windowing.
>
> Some users have been following a similar approach as you outlined, using
> the Processor API.
>
>
>
> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
> > It would be helpful to know the 'start' and 'end' of the current
> metadata,
> > so if an out of order message arrives late, and is being processed in
> > foreach(), you'd know which window / bucket it belongs to, and can handle
> > it accordingly.
> >
> > I'm guessing that's not possible at the moment.
> >
> > (My use case is, i receive a stream of messages. Messages need to be
> stored
> > and sorted into 'buckets', to indicate 'sessions'. Each time there's a
> gap
> > of 30 mins or more since the last message (under a key), a new 'session'
> > (bucket) should be started, and future messages should belong to that
> > 'session', until the next 30+ min gap).
> >
> > On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io>
> > wrote:
> >
> > > > Can windows only be used for aggregations, or can they also be used
> for
> > > foreach(),
> > > and such?
> > >
> > > As of today, you can use windows only in aggregations.
> > >
> > > > And is it possible to get metadata on the message, such as whether or
> > > not its
> > > late, its index/position within the other messages, etc?
> > >
> > > If you use the Processor API of Kafka Streams, you can have access to
> an
> > > incoming record's topic, partition, offset, etc. via the so-called
> > > ProcessorContext (which is updated for every new incoming record):
> > >
> > > http://docs.confluent.io/current/streams/javadocs/org/
> > > apache/kafka/streams/processor/Processor.html
> > > - You can get/store a reference to the ProcessorContext from
> > > `Processor#init()`.
> > >
> > > http://docs.confluent.io/current/streams/javadocs/org/
> > > apache/kafka/streams/processor/ProcessorContext.html
> > > - The context can then be used within `Processor#process()` when you
> > > process a new record.  As I said, the context is updated behind the
> > scenes
> > > to match the record that is currently being processed.
> > >
> > >
> > > Best,
> > > Michael
> > >
> > >
> > >
> > >
> > > On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com>
> > wrote:
> > >
> > > > Can windows only be used for aggregations, or can they also be used
> for
> > > > foreach(), and such?
> > > >
> > > > And is it possible to get metadata on the message, such as whether or
> > not
> > > > its late, its index/position within the other messages, etc?
> > > >
> > > > On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io>
> > > > wrote:
> > > >
> > > > > And since you asked for a pointer, Ali:
> > > > > http://docs.confluent.io/current/streams/concepts.html#windowing
> > > > >
> > > > >
> > > > > On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
> mich...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Late-arriving and out-of-order data is only treated specially for
> > > > > windowed
> > > > > > aggregations.
> > > > > >
> > > > > > For stateless operations such as `KStream#foreach()` or
> > > > `KStream#map()`,
> > > > > > records are processed in the order they arrive (per partition).
> > > > > >
> > > > > > -Michael
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sat, Mar 18, 2017 at 10:47 PM, Ali Akhtar <
> ali.rac...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> > later when message A arrives it will put that message back
> into
> > > > > >> > the right temporal context and publish an amended result for
> the
> > > > > proper
> > > > > >> > time/session window as if message B were consumed in the
> > timestamp
> > > > > order
> > > > > >> > before message A.
> > > > > >>
> > > > > >> Does this apply to the aggregation Kafka stream methods then,
> and
> > > not
> > > > to
> > > > > >> e.g foreach?
> > > > > >>
> > > > > >> On Sun, Mar 19, 2017 at 2:40 AM, Hans Jespersen <
> > h...@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Yes stream processing and CEP are subtlety different things.
> > > > > >> >
> > > > > >> > Kafka Streams helps you write stateful apps and allows that
> > state
> > > to
> > > > > be
> > > > > >> > preserved on disk (a local State store) as well as distributed
> > for
> > > > HA
> > > > > or
> > > > > >> > for parallel partitioned processing (via Kafka topic
> partitions
> > > and
> > > > > >> > consumer groups) as well as in memory (as a performance
> > > > enhancement).
> > > > > >> >
> > > > > >> > However a classical CEP engine with a pre-modeled state
> machine
> > > and
> > > > > >> > pattern matching rules is something different from stream
> > > > processing.
> > > > > >> >
> > > > > >> > It is on course possible to build a CEP system on top on Kafka
> > > > Streams
> > > > > >> and
> > > > > >> > get the best of both worlds.
> > > > > >> >
> > > > > >> > -hans
> > > > > >> >
> > > > > >> > > On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <
> > > > > >> > sabarish....@gmail.com> wrote:
> > > > > >> > >
> > > > > >> > > Hans
> > > > > >> > >
> > > > > >> > > What you state would work for aggregations, but not for
> state
> > > > > machines
> > > > > >> > and
> > > > > >> > > CEP.
> > > > > >> > >
> > > > > >> > > Regards
> > > > > >> > > Sab
> > > > > >> > >
> > > > > >> > >> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <
> > h...@confluent.io
> > > >
> > > > > >> wrote:
> > > > > >> > >>
> > > > > >> > >> The only way to make sure A is consumed first would be to
> > delay
> > > > the
> > > > > >> > >> consumption of message B for at least 15 minutes which
> would
> > > fly
> > > > in
> > > > > >> the
> > > > > >> > >> face of the principals of a true streaming platform so the
> > > short
> > > > > >> answer
> > > > > >> > to
> > > > > >> > >> your question is "no" because that would be batch
> processing
> > > not
> > > > > >> stream
> > > > > >> > >> processing.
> > > > > >> > >>
> > > > > >> > >> However, Kafka Streams does handle late arriving data. So
> if
> > > you
> > > > > had
> > > > > >> > some
> > > > > >> > >> analytics that computes results on a time window or a
> session
> > > > > window
> > > > > >> > then
> > > > > >> > >> Kafka streams will compute on the stream in real time
> > > (processing
> > > > > >> > message
> > > > > >> > >> B) and then later when message A arrives it will put that
> > > message
> > > > > >> back
> > > > > >> > into
> > > > > >> > >> the right temporal context and publish an amended result
> for
> > > the
> > > > > >> proper
> > > > > >> > >> time/session window as if message B were consumed in the
> > > > timestamp
> > > > > >> order
> > > > > >> > >> before message A. The end result of this flow is that you
> > > > > eventually
> > > > > >> get
> > > > > >> > >> the same results you would get in a batch processing system
> > but
> > > > > with
> > > > > >> the
> > > > > >> > >> added benefit of getting intermediary result at much lower
> > > > latency.
> > > > > >> > >>
> > > > > >> > >> -hans
> > > > > >> > >>
> > > > > >> > >> /**
> > > > > >> > >> * Hans Jespersen, Principal Systems Engineer, Confluent
> Inc.
> > > > > >> > >> * h...@confluent.io (650)924-2670
> > > > > >> > >> */
> > > > > >> > >>
> > > > > >> > >>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <
> > > > > ali.rac...@gmail.com>
> > > > > >> > wrote:
> > > > > >> > >>>
> > > > > >> > >>> Is it possible to have Kafka Streams order messages
> > correctly
> > > by
> > > > > >> their
> > > > > >> > >>> timestamps, even if they arrived out of order?
> > > > > >> > >>>
> > > > > >> > >>> E.g, say Message A with a timestamp of 5:00 PM and
> Message B
> > > > with
> > > > > a
> > > > > >> > >>> timestamp of 5:15 PM, are sent.
> > > > > >> > >>>
> > > > > >> > >>> Message B arrives sooner than Message A, due to network
> > > issues.
> > > > > >> > >>>
> > > > > >> > >>> Is it possible to make sure that, across all consumers of
> > > Kafka
> > > > > >> Streams
> > > > > >> > >>> (even if they are across different servers, but have the
> > same
> > > > > >> consumer
> > > > > >> > >>> group), Message A is consumed first, before Message B?
> > > > > >> > >>>
> > > > > >> > >>> Thanks.
> > > > > >> > >>>
> > > > > >> > >>
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to