This is great!

-Jay

On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Hi all,
>
> I've updated the wiki page with a draft pattern for consecutively growing
> time-windowed aggregations which was discussed some time ago on this
> mailing list.
>
> I'm yet to add the part that cleans up the stores using punctuations. Stay
> tuned.
>
>
> On a somewhat similar subject, I've been working to implement the
> following requirements:
>
> * transaction sums per customer session (simple, just extract non-expired
> session-windowed aggregates from a SessionStore using interactive queries)
>
> * global transaction sums for all *currently active* customer sessions
>
> The second bit proved non-trivial, because session-windowed KTables (or
> any windowed KTables for that matter) don't notify downstream when a window
> expires. And I can't use punctuate until KIP-138 is implemented because
> stream time punctuation is no good in this case (records can stop coming),
> reliable system time punctuation would be needed.
>
> Below is how I implemented this, I'm yet to test it thoroughly.
>
> I wonder if anyone knows of an easier way of achieving the same.
>
> If so, I'm looking forward to suggestions. If not, I'll add that to the
> patterns wiki page too, in case someone else finds it useful.
>
>
> builder
>   .stream(/*key serde*/, /*transaction serde*/, "transaciton-topic")
>
>   .groupByKey(/*key serde*/, /*transaction serde*/)
>
>   .aggregate(
>     () -> /*empty aggregate*/,
>     aggregator(),
>     merger(),
>     SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
>     /* aggregate serde */,
>     txPerCustomerSumStore() // this store can be queried for per customer 
> session data  )
>
>   .toStream()
>
>   .filter(((key, value) -> value != null)) // tombstones only come when a 
> session is merged into a bigger session, so ignore them
>    // the below map/groupByKey/reduce operations are to only propagate 
> updates to the *latest* session per customer to downstream
>
>   .map((windowedCustomerId, agg) -> // this moves timestamp from the windowed 
> key into the value                                                          
> // so that we can group by customerId only and reduce to the later value    
> new KeyValue<>(
>       windowedCustomerId.key(), // just customerId      new WindowedAggsImpl( 
> // this is just like a tuple2 but with nicely named accessors: timestamp() 
> and aggs()
>         windowedCustomerId.window().end(),
>         agg
>       )
>     )
>   )
>   .groupByKey( /*key serde*/, /*windowed aggs serde*/ ) // key is just 
> customerId  .reduce( // take later session value and ignore any older - 
> downstream only cares about *current* sessions    (val, agg) -> 
> val.timestamp() > agg.timestamp() ? val : agg,
>     
> TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
>     "latest-session-windowed"  )
>
>   .groupBy((windowedCustomerId, timeAndAggs) -> // calculate totals with 
> maximum granularity, which is per-partition    new KeyValue<>(
>       new Windowed<>(
>         windowedCustomerId.key().hashCode() % PARTITION_COUNT_FOR_TOTALS,  // 
> KIP-159 would come in handy here, to access partition number instead
>         windowedCustomerId.window() // will use this in the interactive 
> queries to pick the oldest not-yet-expired window
>       ),
>       timeAndAggs.aggs()
>     ),
>     new SessionKeySerde<>(Serdes.Integer()),    /* aggregate serde */
>   )
>
>   .reduce(
>     (val, agg) -> agg.add(val),
>     (val, agg) -> agg.subtract(val),
>     txTotalsStore() // this store can be queried to get totals per partition 
> for all active sessions  );
>
> builder.globalTable(
>   new SessionKeySerde<>(Serdes.Integer()),
>   /* aggregate serde */,
>   changelogTopicForStore(TRANSACTION_TOTALS), "totals");// this global table 
> puts per partition totals on every node, so that they can be easily summed 
> for global totals, picking the oldest not-yet-expired window
>
> TODO: put in StreamParitioners (with KTable.through variants added in
> KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.
>
> The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to
> first do summation with max parallelism and minimize the work needed
> downstream. So I calculate a per-partition sum first to limit the updates
> that the totals topic will receive and the summing work done by the
> interactive queries on the global store. Is this a good way of going about
> it?
>
> Thanks,
>
> MichaƂ
>
> On 09/05/17 18:31, Matthias J. Sax wrote:
>
> Hi,
>
> I started a new Wiki page to collect some common usage patterns for
> Kafka Streams.
>
> Right now, it contains a quick example on "how to compute average". Hope
> we can collect more example like this!
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
>
>
> -Matthias
>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowie...@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>

Reply via email to