Apologies,

In the code snippet of course only oneMinuteWindowed KTable will have a Windowed key (KTable<Windowed<Key>, Value>), all others would be just KTable<Tuple2<Key, Long>, Value>.

MichaƂ

On 07/05/17 16:09, Michal Borowiecki wrote:

Hi Garrett,

I've encountered a similar challenge in a project I'm working on (it's still work in progress, so please take my suggestions with a grain of salt).

Yes, I believe KTable.groupBy lets you accomplish what you are aiming for with something like the following (same snippet attached as txt file):


KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream // where Key and Value stand for your actual key and value types

    .groupByKey()

    .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");

        //where your adder can be as simple as (val, agg) -> agg + val

        //for primitive types or as complex as you need


KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = oneMinuteWindowed // Tuple2 for this example as defined by javaslang library

.groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), value)

// the above rounds time down to a timestamp divisible by 5 minutes

    .reduce(/*your adder*/, /*your subtractor*/, "store5m");

// where your subtractor can be as simple as (val, agg) -> agg - valfor primitive types or as complex as you need,

// just make sure you get the order right (lesson hard learnt ;) ), subtraction is not commutative!


KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = fiveMinuteWindowed

.groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)

// the above rounds time down to a timestamp divisible by 15 minutes

    .reduce(/*your adder*/, /*your subtractor*/, "store15m");


KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed

.groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)

// the above rounds time down to a timestamp divisible by 5 minutes

    .reduce(/*your adder*/, /*your subtractor*/, "store60m");


So, step by step:

  * You use a windowed aggregation only once, from there on you use
    the KTable abstraction only (which doesn't have windowed
    aggregations).
  * In each subsequent groupBy you map the key to a pair of
    (your-real-key, timestamp) where the timestamp is rounded down
    with the precision of the size of the new window.
  * reduce() on a KGroupedTable takes an adder and a subtractor and it
    will correctly update the new aggregate by first subtracting the
    previous value of the upstream record before adding the new value
    (this way, just as you said, the downstream is aware of the
    statefulness of the upstream and correctly treats each record as
    an update)
  * If you want to reduce message volume further, you can break these
    into separate KafkaStreams instances and configure downstream ones
    with a higher commit.interval.ms (unfortunately you can't have
    different values of this setting in different places of the same
    topology I'm afraid)
  * TODO: Look into retention policies, I haven't investigated that in
    any detail.

I haven't tested this exact code, so please excuse any typos.

Also, if someone with more experience could chip in and check if I'm not talking nonsense here, or if there's an easier way to this, that would be great.


I don't know if the alternative approach is possible, where you convert each resulting KTable back into a stream and just do a windowed aggregation somehow. That would feel more natural, but I haven't figured out how to correctly window over a changelog in the KStream abstraction, feels impossible in the high-level DSL.

Hope that helps,
Michal

On 02/05/17 18:03, Garrett Barton wrote:
Lets say I want to sum values over increasing window sizes of 1,5,15,60
minutes.  Right now I have them running in parallel, meaning if I am
producing 1k/sec records I am consuming 4k/sec to feed each calculation.
In reality I am calculating far more than sum, and in this pattern I'm
looking at something like (producing rate)*(calculations)*(windows) for a
consumption rate.

  So I had the idea, could I feed the 1 minute window into the 5 minute, and
5 into 15, and 15 into 60.  Theoretically I would consume a fraction of the
records, not have to scale as huge and be back to something like (producing
rate)*(calculations)+(updates).

   Thinking this is an awesome idea I went to try and implement it and got
twisted around.  These are windowed grouping operations that produce
KTables, which means instead of a raw stream I have an update stream.  To
me this implies that downstream must be aware of this and consume stateful
information, knowing that each record is an update and not an in addition
to.  Does the high level api handle that construct and let me do that?  For
a simple sum it would have to hold each of the latest values for say the 5
1 minute sum's in a given window, to perform the 5 minute sum.  Reading the
docs which are awesome, I cannot determine if the KTable.groupby() would
work over a window, and would reduce or aggregate thus do what I need?

Any ideas?


--
Signature
<http://www.openbet.com/>         Michal Borowiecki
Senior Software Engineer L4
        T:      +44 208 742 1600

        
        +44 203 249 8448

        
        
        E:      michal.borowie...@openbet.com
        W:      www.openbet.com <http://www.openbet.com/>

        
        OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK

        
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612


--
Signature
<http://www.openbet.com/>         Michal Borowiecki
Senior Software Engineer L4
        T:      +44 208 742 1600

        
        +44 203 249 8448

        
        
        E:      michal.borowie...@openbet.com
        W:      www.openbet.com <http://www.openbet.com/>

        
        OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK

        
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612

Reply via email to