Kohki,

Thanks for the explanation, it's very helpful.

As we have talked in another email thread you started, originally I thought
the motivation to use "explicit triggers" (i.e. what it achieves with your
watermark) was due to application logic, i.e. whenever you have received a
record that triggers an action (i.e. alarm) in the down stream, you do not
want to update this key's value any more.

But from your description here, it seems the motivation is not only from
application logic, but also from operational concerns, about downstream
traffic, but by using explicit triggers you can reduce the update records
sent to downstream as well, which I agrees.

Just wanting to point out that in your real monitoring app, if you want
each alarm triggered record to be queriable later, you can consider
branching such changelog streams to keep these updates so that they can be
backed in a state store even if a later record with the same key reset it
to be normal.


Guozhang



On Mon, Feb 27, 2017 at 10:08 AM, Kohki Nishio <tarop...@gmail.com> wrote:

> Guozhang,
> It's a bit difficult to explain, but let me try ... the basic idea is that
> we can assume most of messages have the same clock (per partition at
> least), then if an offset has information about metadata about the target
> time of the offset, fail-over works.
>
> Offset = 12222
> Metadata Time = 2/12/2017 12:00:00
>
> After this offset (12222), I only process messages later than the metadata
> time. And from the application, it commits only when it's safe to move the
> time bucket forward for all keys, it picks the earliest time of the time
> bucket from each key. But that 'move forward' is not per key decision, it's
> per partition decision. So I need to define the maximum time range in which
> I need to keep data.
>
> But it's not always that simple, there are outliers(network hiccup) and
> delayed arrivals(wrong ntp), my plan is to mark those keys as 'best effort'
> group when it happens. For those keys, as long as JVM keeps running, I can
> handle those but those won't be a part of 'per partition decision'..
>
> Hope this gives some idea about what I'm trying to do .. I'm planning to
> use Processor API to do this.
>
> Thanks
> -Kohki
>
>
>
>
>
>
> On Sun, Feb 26, 2017 at 8:56 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Kohki,
> >
> > Given your data traffic and the state volume I cannot think of a better
> > solution but suggest using large number of partitioned local states.
> >
> > I'm wondering how would "per partition watermark" can help with your
> > traffic?
> >
> > Guozhang
> >
> > On Sun, Feb 26, 2017 at 10:45 AM, Kohki Nishio <tarop...@gmail.com>
> wrote:
> >
> > > Guozhang,
> > >
> > > Let me explain what I'm trying to do. The message volume is large (TB
> per
> > > Day) and that is coming to a topic. Now I want to do per minute
> > > aggregation(Windowed) and send the output to the downstream (a topic)
> > >
> > > (Topic1 - Large Volume) -> [Stream App] -> (Topic2 - Large Volume)
> > >
> > > I assume the internal topic would have the same amount of data as
> topic1
> > > and the same goes to the local store, I know we can tweak retention
> > period
> > > but network traffic would be same (or even more)
> > >
> > > The key point here is that most of incoming stream will end up a single
> > > data point per a minute (aggregation window), but the variance of the
> key
> > > is huge (high cardinality), then buffering wouldn't really help reduce
> > the
> > > data/traffic size.
> > >
> > > I'm going to do something like per partition watermarking with offset
> > > metadata. It wouldn't increase the traffic that much
> > > thanks
> > > -Kohki
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Kohki Nishio
>



-- 
-- Guozhang

Reply via email to