Michael, This is slick! I am still writing unit tests to verify it. My code looks something like:
KTable<Windowed<String>, CountSumMinMaxAvgObj> oneMinuteWindowed = srcStream // my val object isnt really called that, just wanted to show a sample set of calculations the value can do! .groupByKey(Serdes.String(), Serdes.Double()) .aggregate(/*initializer */, /* aggregator */, TimeWindows.of(60*1000, 60*1000), "store1m"); // i used an aggregate here so I could have a non-primitive value object that does the calculations on each aggregator, pojo has an .add(Double) in it. KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed = oneMinuteWindowed // I made my own Tuple2, will move window calc into it .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<String, Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5 *1000*60*5), value, keySerde, valSerde) // the above rounds time down to a timestamp divisible by 5 minutes .reduce(/*your adder*/, /*your subtractor*/, "store5m"); // where your subtractor can be as simple as (val, agg) -> agg - val for primitive types or as complex as you need, // just make sure you get the order right (lesson hard learnt ;) ), subtraction is not commutative! // again my val object has an .add(Obj) and a .sub() to handle this, so nice! KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1, keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde) // the above rounds time down to a timestamp divisible by 15 minutes .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde) // the above rounds time down to a timestamp divisible by 60 minutes .reduce(/*your adder*/, /*your subtractor*/, "store60m"); Notes thus far: Doesn't look like I need to start the 5min with a windowed KTable return object, it starts with the regular KTable<Tuple2<String,Long>> in this case. I thinking about using windowedKey.window().start() instead of end() as I believe that is more consistent with what the windows themselves put out. They go into the stores bound by their start time I believe. Serdes gets nuts as well as the Generic typing on some of these classes (yea you KeyValueMapper), makes for long code! I had to specify them everywhere since the key/val's changed. I didn't get enough time to mess with it today, I will wrap up the unit tests and run it to see how it performs against my real data as well tomorrow. I expect a huge reduction in resources (both streams and kafka storage) by moving to this. Thank you! On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Michal, > > that's an interesting idea. In an ideal world, Kafka Streams should have > an optimizer that is able to to this automatically under the hood. Too > bad we are not there yet. > > @Garret: did you try this out? > > This seems to be a question that might affect many users, and it might > we worth to document it somewhere as a recommended pattern. > > > -Matthias > > > On 5/8/17 1:43 AM, Michal Borowiecki wrote: > > Apologies, > > > > In the code snippet of course only oneMinuteWindowed KTable will have a > > Windowed key (KTable<Windowed<Key>, Value>), all others would be just > > KTable<Tuple2<Key, Long>, Value>. > > > > MichaĆ > > > > On 07/05/17 16:09, Michal Borowiecki wrote: > >> > >> Hi Garrett, > >> > >> I've encountered a similar challenge in a project I'm working on (it's > >> still work in progress, so please take my suggestions with a grain of > >> salt). > >> > >> Yes, I believe KTable.groupBy lets you accomplish what you are aiming > >> for with something like the following (same snippet attached as txt > file): > >> > >> > >> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream // > >> where Key and Value stand for your actual key and value types > >> > >> .groupByKey() > >> > >> .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), > "store1m"); > >> > >> //where your adder can be as simple as (val, agg) -> agg + val > >> > >> //for primitive types or as complex as you need > >> > >> > >> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = > >> oneMinuteWindowed // Tuple2 for this example as defined by > >> javaslang library > >> > >> .groupBy( (windowedKey, value) -> new KeyValue<>(new > >> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 > >> *1000*60*5), value) > >> > >> // the above rounds time down to a timestamp divisible by 5 > >> minutes > >> > >> .reduce(/*your adder*/, /*your subtractor*/, "store5m"); > >> > >> // where your subtractor can be as simple as (val, agg) -> agg > >> - valfor primitive types or as complex as you need, > >> > >> // just make sure you get the order right (lesson hard learnt > >> ;) ), subtraction is not commutative! > >> > >> > >> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed = > >> fiveMinuteWindowed > >> > >> .groupBy( (keyPair, value) -> new KeyValue<>(new > >> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value) > >> > >> // the above rounds time down to a timestamp divisible by 15 > >> minutes > >> > >> .reduce(/*your adder*/, /*your subtractor*/, "store15m"); > >> > >> > >> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = > >> fifteeenMinuteWindowed > >> > >> .groupBy( (keyPair, value) -> new KeyValue<>(new > >> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value) > >> > >> // the above rounds time down to a timestamp divisible by 5 > >> minutes > >> > >> .reduce(/*your adder*/, /*your subtractor*/, "store60m"); > >> > >> > >> So, step by step: > >> > >> * You use a windowed aggregation only once, from there on you use > >> the KTable abstraction only (which doesn't have windowed > >> aggregations). > >> * In each subsequent groupBy you map the key to a pair of > >> (your-real-key, timestamp) where the timestamp is rounded down > >> with the precision of the size of the new window. > >> * reduce() on a KGroupedTable takes an adder and a subtractor and it > >> will correctly update the new aggregate by first subtracting the > >> previous value of the upstream record before adding the new value > >> (this way, just as you said, the downstream is aware of the > >> statefulness of the upstream and correctly treats each record as > >> an update) > >> * If you want to reduce message volume further, you can break these > >> into separate KafkaStreams instances and configure downstream ones > >> with a higher commit.interval.ms (unfortunately you can't have > >> different values of this setting in different places of the same > >> topology I'm afraid) > >> * TODO: Look into retention policies, I haven't investigated that in > >> any detail. > >> > >> I haven't tested this exact code, so please excuse any typos. > >> > >> Also, if someone with more experience could chip in and check if I'm > >> not talking nonsense here, or if there's an easier way to this, that > >> would be great. > >> > >> > >> I don't know if the alternative approach is possible, where you > >> convert each resulting KTable back into a stream and just do a > >> windowed aggregation somehow. That would feel more natural, but I > >> haven't figured out how to correctly window over a changelog in the > >> KStream abstraction, feels impossible in the high-level DSL. > >> > >> Hope that helps, > >> Michal > >> > >> On 02/05/17 18:03, Garrett Barton wrote: > >>> Lets say I want to sum values over increasing window sizes of 1,5,15,60 > >>> minutes. Right now I have them running in parallel, meaning if I am > >>> producing 1k/sec records I am consuming 4k/sec to feed each > calculation. > >>> In reality I am calculating far more than sum, and in this pattern I'm > >>> looking at something like (producing rate)*(calculations)*(windows) > for a > >>> consumption rate. > >>> > >>> So I had the idea, could I feed the 1 minute window into the 5 > minute, and > >>> 5 into 15, and 15 into 60. Theoretically I would consume a fraction > of the > >>> records, not have to scale as huge and be back to something like > (producing > >>> rate)*(calculations)+(updates). > >>> > >>> Thinking this is an awesome idea I went to try and implement it and > got > >>> twisted around. These are windowed grouping operations that produce > >>> KTables, which means instead of a raw stream I have an update stream. > To > >>> me this implies that downstream must be aware of this and consume > stateful > >>> information, knowing that each record is an update and not an in > addition > >>> to. Does the high level api handle that construct and let me do > that? For > >>> a simple sum it would have to hold each of the latest values for say > the 5 > >>> 1 minute sum's in a given window, to perform the 5 minute sum. > Reading the > >>> docs which are awesome, I cannot determine if the KTable.groupby() > would > >>> work over a window, and would reduce or aggregate thus do what I need? > >>> > >>> Any ideas? > >>> > >> > >> -- > >> Signature > >> <http://www.openbet.com/> Michal Borowiecki > >> Senior Software Engineer L4 > >> T: +44 208 742 1600 > >> > >> > >> +44 203 249 8448 > >> > >> > >> > >> E: michal.borowie...@openbet.com > >> W: www.openbet.com <http://www.openbet.com/> > >> > >> > >> OpenBet Ltd > >> > >> Chiswick Park Building 9 > >> > >> 566 Chiswick High Rd > >> > >> London > >> > >> W4 5XT > >> > >> UK > >> > >> > >> <https://www.openbet.com/email_promo> > >> > >> This message is confidential and intended only for the addressee. If > >> you have received this message in error, please immediately notify the > >> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it > >> from your system as well as any copies. The content of e-mails as well > >> as traffic data may be monitored by OpenBet for employment and > >> security purposes. To protect the environment please do not print this > >> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park > >> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A > >> company registered in England and Wales. Registered no. 3134634. VAT > >> no. GB927523612 > >> > > > > -- > > Signature > > <http://www.openbet.com/> Michal Borowiecki > > Senior Software Engineer L4 > > T: +44 208 742 1600 > > > > > > +44 203 249 8448 > > > > > > > > E: michal.borowie...@openbet.com > > W: www.openbet.com <http://www.openbet.com/> > > > > > > OpenBet Ltd > > > > Chiswick Park Building 9 > > > > 566 Chiswick High Rd > > > > London > > > > W4 5XT > > > > UK > > > > > > <https://www.openbet.com/email_promo> > > > > This message is confidential and intended only for the addressee. If you > > have received this message in error, please immediately notify the > > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it > > from your system as well as any copies. The content of e-mails as well > > as traffic data may be monitored by OpenBet for employment and security > > purposes. To protect the environment please do not print this e-mail > > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building > > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company > > registered in England and Wales. Registered no. 3134634. VAT no. > > GB927523612 > > > >