Michael,
  This is slick!  I am still writing unit tests to verify it.  My code
looks something like:

KTable<Windowed<String>, CountSumMinMaxAvgObj> oneMinuteWindowed =
srcStream    // my val object isnt really called that, just wanted to show
a sample set of calculations the value can do!
    .groupByKey(Serdes.String(), Serdes.Double())
    .aggregate(/*initializer */, /* aggregator */, TimeWindows.of(60*1000,
60*1000), "store1m");

   // i used an aggregate here so I could have a non-primitive value object
that does the calculations on each aggregator, pojo has an .add(Double) in
it.
KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed =
oneMinuteWindowed    // I made my own Tuple2, will move window calc into it
    .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<String,
Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5),
value, keySerde, valSerde)

        // the above rounds time down to a timestamp divisible by 5 minutes

    .reduce(/*your adder*/, /*your subtractor*/, "store5m");

        // where your subtractor can be as simple as (val, agg) -> agg - val
for primitive types or as complex as you need,

        // just make sure you get the order right (lesson hard learnt ;) ),
subtraction is not commutative!

        // again my val object has an .add(Obj) and a .sub() to handle
this, so nice!


KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fifteenMinuteWindowed =
fiveMinuteWindowed

    .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1,
keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde)

        // the above rounds time down to a timestamp divisible by 15 minutes

    .reduce(/*your adder*/, /*your subtractor*/, "store15m");


KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> sixtyMinuteWindowed =
fifteeenMinuteWindowed

    .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1,
pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde)

        // the above rounds time down to a timestamp divisible by 60 minutes

    .reduce(/*your adder*/, /*your subtractor*/, "store60m");


Notes thus far:
  Doesn't look like I need to start the 5min with a windowed KTable return
object, it starts with the regular KTable<Tuple2<String,Long>> in this case.
  I thinking about using windowedKey.window().start() instead of end() as I
believe that is more consistent with what the windows themselves put out.
They go into the stores bound by their start time I believe.
  Serdes gets nuts as well as the Generic typing on some of these classes
(yea you KeyValueMapper), makes for long code!  I had to specify them
everywhere since the key/val's changed.


I didn't get enough time to mess with it today, I will wrap up the unit
tests and run it to see how it performs against my real data as well
tomorrow.  I expect a huge reduction in resources (both streams and kafka
storage) by moving to this.
Thank you!



On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Michal,
>
> that's an interesting idea. In an ideal world, Kafka Streams should have
> an optimizer that is able to to this automatically under the hood. Too
> bad we are not there yet.
>
> @Garret: did you try this out?
>
> This seems to be a question that might affect many users, and it might
> we worth to document it somewhere as a recommended pattern.
>
>
> -Matthias
>
>
> On 5/8/17 1:43 AM, Michal Borowiecki wrote:
> > Apologies,
> >
> > In the code snippet of course only oneMinuteWindowed KTable will have a
> > Windowed key (KTable<Windowed<Key>, Value>), all others would be just
> > KTable<Tuple2<Key, Long>, Value>.
> >
> > MichaƂ
> >
> > On 07/05/17 16:09, Michal Borowiecki wrote:
> >>
> >> Hi Garrett,
> >>
> >> I've encountered a similar challenge in a project I'm working on (it's
> >> still work in progress, so please take my suggestions with a grain of
> >> salt).
> >>
> >> Yes, I believe KTable.groupBy lets you accomplish what you are aiming
> >> for with something like the following (same snippet attached as txt
> file):
> >>
> >>
> >> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream    //
> >> where Key and Value stand for your actual key and value types
> >>
> >>     .groupByKey()
> >>
> >>     .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000),
> "store1m");
> >>
> >>         //where your adder can be as simple as (val, agg) -> agg + val
> >>
> >>         //for primitive types or as complex as you need
> >>
> >>
> >> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed =
> >> oneMinuteWindowed    // Tuple2 for this example as defined by
> >> javaslang library
> >>
> >>     .groupBy( (windowedKey, value) -> new KeyValue<>(new
> >> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
> >> *1000*60*5), value)
> >>
> >>         // the above rounds time down to a timestamp divisible by 5
> >> minutes
> >>
> >>     .reduce(/*your adder*/, /*your subtractor*/, "store5m");
> >>
> >>         // where your subtractor can be as simple as (val, agg) -> agg
> >> - valfor primitive types or as complex as you need,
> >>
> >>         // just make sure you get the order right (lesson hard learnt
> >> ;) ), subtraction is not commutative!
> >>
> >>
> >> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =
> >> fiveMinuteWindowed
> >>
> >>     .groupBy( (keyPair, value) -> new KeyValue<>(new
> >> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)
> >>
> >>         // the above rounds time down to a timestamp divisible by 15
> >> minutes
> >>
> >>     .reduce(/*your adder*/, /*your subtractor*/, "store15m");
> >>
> >>
> >> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed =
> >> fifteeenMinuteWindowed
> >>
> >>     .groupBy( (keyPair, value) -> new KeyValue<>(new
> >> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)
> >>
> >>         // the above rounds time down to a timestamp divisible by 5
> >> minutes
> >>
> >>     .reduce(/*your adder*/, /*your subtractor*/, "store60m");
> >>
> >>
> >> So, step by step:
> >>
> >>   * You use a windowed aggregation only once, from there on you use
> >>     the KTable abstraction only (which doesn't have windowed
> >>     aggregations).
> >>   * In each subsequent groupBy you map the key to a pair of
> >>     (your-real-key, timestamp) where the timestamp is rounded down
> >>     with the precision of the size of the new window.
> >>   * reduce() on a KGroupedTable takes an adder and a subtractor and it
> >>     will correctly update the new aggregate by first subtracting the
> >>     previous value of the upstream record before adding the new value
> >>     (this way, just as you said, the downstream is aware of the
> >>     statefulness of the upstream and correctly treats each record as
> >>     an update)
> >>   * If you want to reduce message volume further, you can break these
> >>     into separate KafkaStreams instances and configure downstream ones
> >>     with a higher commit.interval.ms (unfortunately you can't have
> >>     different values of this setting in different places of the same
> >>     topology I'm afraid)
> >>   * TODO: Look into retention policies, I haven't investigated that in
> >>     any detail.
> >>
> >> I haven't tested this exact code, so please excuse any typos.
> >>
> >> Also, if someone with more experience could chip in and check if I'm
> >> not talking nonsense here, or if there's an easier way to this, that
> >> would be great.
> >>
> >>
> >> I don't know if the alternative approach is possible, where you
> >> convert each resulting KTable back into a stream and just do a
> >> windowed aggregation somehow. That would feel more natural, but I
> >> haven't figured out how to correctly window over a changelog in the
> >> KStream abstraction, feels impossible in the high-level DSL.
> >>
> >> Hope that helps,
> >> Michal
> >>
> >> On 02/05/17 18:03, Garrett Barton wrote:
> >>> Lets say I want to sum values over increasing window sizes of 1,5,15,60
> >>> minutes.  Right now I have them running in parallel, meaning if I am
> >>> producing 1k/sec records I am consuming 4k/sec to feed each
> calculation.
> >>> In reality I am calculating far more than sum, and in this pattern I'm
> >>> looking at something like (producing rate)*(calculations)*(windows)
> for a
> >>> consumption rate.
> >>>
> >>>  So I had the idea, could I feed the 1 minute window into the 5
> minute, and
> >>> 5 into 15, and 15 into 60.  Theoretically I would consume a fraction
> of the
> >>> records, not have to scale as huge and be back to something like
> (producing
> >>> rate)*(calculations)+(updates).
> >>>
> >>>   Thinking this is an awesome idea I went to try and implement it and
> got
> >>> twisted around.  These are windowed grouping operations that produce
> >>> KTables, which means instead of a raw stream I have an update stream.
> To
> >>> me this implies that downstream must be aware of this and consume
> stateful
> >>> information, knowing that each record is an update and not an in
> addition
> >>> to.  Does the high level api handle that construct and let me do
> that?  For
> >>> a simple sum it would have to hold each of the latest values for say
> the 5
> >>> 1 minute sum's in a given window, to perform the 5 minute sum.
> Reading the
> >>> docs which are awesome, I cannot determine if the KTable.groupby()
> would
> >>> work over a window, and would reduce or aggregate thus do what I need?
> >>>
> >>> Any ideas?
> >>>
> >>
> >> --
> >> Signature
> >> <http://www.openbet.com/>    Michal Borowiecki
> >> Senior Software Engineer L4
> >>      T:      +44 208 742 1600
> >>
> >>
> >>      +44 203 249 8448
> >>
> >>
> >>
> >>      E:      michal.borowie...@openbet.com
> >>      W:      www.openbet.com <http://www.openbet.com/>
> >>
> >>
> >>      OpenBet Ltd
> >>
> >>      Chiswick Park Building 9
> >>
> >>      566 Chiswick High Rd
> >>
> >>      London
> >>
> >>      W4 5XT
> >>
> >>      UK
> >>
> >>
> >> <https://www.openbet.com/email_promo>
> >>
> >> This message is confidential and intended only for the addressee. If
> >> you have received this message in error, please immediately notify the
> >> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
> >> from your system as well as any copies. The content of e-mails as well
> >> as traffic data may be monitored by OpenBet for employment and
> >> security purposes. To protect the environment please do not print this
> >> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
> >> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
> >> company registered in England and Wales. Registered no. 3134634. VAT
> >> no. GB927523612
> >>
> >
> > --
> > Signature
> > <http://www.openbet.com/>     Michal Borowiecki
> > Senior Software Engineer L4
> >       T:      +44 208 742 1600
> >
> >
> >       +44 203 249 8448
> >
> >
> >
> >       E:      michal.borowie...@openbet.com
> >       W:      www.openbet.com <http://www.openbet.com/>
> >
> >
> >       OpenBet Ltd
> >
> >       Chiswick Park Building 9
> >
> >       566 Chiswick High Rd
> >
> >       London
> >
> >       W4 5XT
> >
> >       UK
> >
> >
> > <https://www.openbet.com/email_promo>
> >
> > This message is confidential and intended only for the addressee. If you
> > have received this message in error, please immediately notify the
> > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
> > from your system as well as any copies. The content of e-mails as well
> > as traffic data may be monitored by OpenBet for employment and security
> > purposes. To protect the environment please do not print this e-mail
> > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> > registered in England and Wales. Registered no. 3134634. VAT no.
> > GB927523612
> >
>
>

Reply via email to