This is great! -Jay
On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki < michal.borowie...@openbet.com> wrote: > Hi all, > > I've updated the wiki page with a draft pattern for consecutively growing > time-windowed aggregations which was discussed some time ago on this > mailing list. > > I'm yet to add the part that cleans up the stores using punctuations. Stay > tuned. > > > On a somewhat similar subject, I've been working to implement the > following requirements: > > * transaction sums per customer session (simple, just extract non-expired > session-windowed aggregates from a SessionStore using interactive queries) > > * global transaction sums for all *currently active* customer sessions > > The second bit proved non-trivial, because session-windowed KTables (or > any windowed KTables for that matter) don't notify downstream when a window > expires. And I can't use punctuate until KIP-138 is implemented because > stream time punctuation is no good in this case (records can stop coming), > reliable system time punctuation would be needed. > > Below is how I implemented this, I'm yet to test it thoroughly. > > I wonder if anyone knows of an easier way of achieving the same. > > If so, I'm looking forward to suggestions. If not, I'll add that to the > patterns wiki page too, in case someone else finds it useful. > > > builder > .stream(/*key serde*/, /*transaction serde*/, "transaciton-topic") > > .groupByKey(/*key serde*/, /*transaction serde*/) > > .aggregate( > () -> /*empty aggregate*/, > aggregator(), > merger(), > SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2), > /* aggregate serde */, > txPerCustomerSumStore() // this store can be queried for per customer > session data ) > > .toStream() > > .filter(((key, value) -> value != null)) // tombstones only come when a > session is merged into a bigger session, so ignore them > // the below map/groupByKey/reduce operations are to only propagate > updates to the *latest* session per customer to downstream > > .map((windowedCustomerId, agg) -> // this moves timestamp from the windowed > key into the value > // so that we can group by customerId only and reduce to the later value > new KeyValue<>( > windowedCustomerId.key(), // just customerId new WindowedAggsImpl( > // this is just like a tuple2 but with nicely named accessors: timestamp() > and aggs() > windowedCustomerId.window().end(), > agg > ) > ) > ) > .groupByKey( /*key serde*/, /*windowed aggs serde*/ ) // key is just > customerId .reduce( // take later session value and ignore any older - > downstream only cares about *current* sessions (val, agg) -> > val.timestamp() > agg.timestamp() ? val : agg, > > TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS), > "latest-session-windowed" ) > > .groupBy((windowedCustomerId, timeAndAggs) -> // calculate totals with > maximum granularity, which is per-partition new KeyValue<>( > new Windowed<>( > windowedCustomerId.key().hashCode() % PARTITION_COUNT_FOR_TOTALS, // > KIP-159 would come in handy here, to access partition number instead > windowedCustomerId.window() // will use this in the interactive > queries to pick the oldest not-yet-expired window > ), > timeAndAggs.aggs() > ), > new SessionKeySerde<>(Serdes.Integer()), /* aggregate serde */ > ) > > .reduce( > (val, agg) -> agg.add(val), > (val, agg) -> agg.subtract(val), > txTotalsStore() // this store can be queried to get totals per partition > for all active sessions ); > > builder.globalTable( > new SessionKeySerde<>(Serdes.Integer()), > /* aggregate serde */, > changelogTopicForStore(TRANSACTION_TOTALS), "totals");// this global table > puts per partition totals on every node, so that they can be easily summed > for global totals, picking the oldest not-yet-expired window > > TODO: put in StreamParitioners (with KTable.through variants added in > KAFKA-5045) to avoid re-partitioning where I know it's unnecessary. > > The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to > first do summation with max parallelism and minimize the work needed > downstream. So I calculate a per-partition sum first to limit the updates > that the totals topic will receive and the summing work done by the > interactive queries on the global store. Is this a good way of going about > it? > > Thanks, > > MichaĆ > > On 09/05/17 18:31, Matthias J. Sax wrote: > > Hi, > > I started a new Wiki page to collect some common usage patterns for > Kafka Streams. > > Right now, it contains a quick example on "how to compute average". Hope > we can collect more example like this! > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns > > > -Matthias > > > > -- > <http://www.openbet.com/> Michal Borowiecki > Senior Software Engineer L4 > T: +44 208 742 1600 <+44%2020%208742%201600> > > > +44 203 249 8448 <+44%2020%203249%208448> > > > > E: michal.borowie...@openbet.com > W: www.openbet.com > OpenBet Ltd > > Chiswick Park Building 9 > > 566 Chiswick High Rd > > London > > W4 5XT > > UK > <https://www.openbet.com/email_promo> > This message is confidential and intended only for the addressee. If you > have received this message in error, please immediately notify the > postmas...@openbet.com and delete it from your system as well as any > copies. The content of e-mails as well as traffic data may be monitored by > OpenBet for employment and security purposes. To protect the environment > please do not print this e-mail unless necessary. OpenBet Ltd. Registered > Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, > United Kingdom. A company registered in England and Wales. Registered no. > 3134634. VAT no. GB927523612 >