Hi all,

Another pattern I think is worth adding is a sliding-windowed message reordering and de-duplicating processor.

The outline I have in mind is based on this (just the timestamp would come from the record context - in this question the timestamp was in the body of the message):

https://stackoverflow.com/a/44345374/7897191

Please let me know if you have a better design for this?

Cheers,

Michal


On 27/05/17 21:16, Jay Kreps wrote:
This is great!

-Jay

On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>> wrote:

    Hi all,

    I've updated the wiki page with a draft pattern for consecutively
    growing time-windowed aggregations which was discussed some time
    ago on this mailing list.

    I'm yet to add the part that cleans up the stores using
    punctuations. Stay tuned.


    On a somewhat similar subject, I've been working to implement the
    following requirements:

    * transaction sums per customer session (simple, just extract
    non-expired session-windowed aggregates from a SessionStore using
    interactive queries)

    * global transaction sums for all _/currently active/_ customer
    sessions

    The second bit proved non-trivial, because session-windowed
    KTables (or any windowed KTables for that matter) don't notify
    downstream when a window expires. And I can't use punctuate until
    KIP-138 is implemented because stream time punctuation is no good
    in this case (records can stop coming), reliable system time
    punctuation would be needed.

    Below is how I implemented this, I'm yet to test it thoroughly.

    I wonder if anyone knows of an easier way of achieving the same.

    If so, I'm looking forward to suggestions. If not, I'll add that
    to the patterns wiki page too, in case someone else finds it useful.


    builder
       .stream(/*key serde*/, /*transaction serde*/,"transaciton-topic")

       .groupByKey(/*key serde*/, /*transaction serde*/)

       .aggregate(
         () -> /*empty aggregate*/,
         aggregator(),
         merger(),
         SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
         /* aggregate serde */,
         txPerCustomerSumStore()// this store can be queried for per customer 
session data )

       .toStream()

       .filter(((key, value) -> value !=null))// tombstones only come when a 
session is merged into a bigger
    session, so ignore them

    // the below map/groupByKey/reduce operations are to only
    propagate updates to the _latest_ session per customer to downstream

       .map((windowedCustomerId, agg) ->// this moves timestamp from the 
windowed key into the value // so
    that we can group by customerId only and reduce to the later value
    new KeyValue<>(
           windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// 
this is just like a tuple2 but with nicely named accessors:
    timestamp() and aggs()
             windowedCustomerId.window().end(),
             agg
           )
         )
       )
       .groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just 
customerId .reduce(// take later session value and ignore any older - 
downstream only
    cares about _current_ sessions (val, agg) -> val.timestamp() > 
agg.timestamp() ? val : agg,
         
TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
         "latest-session-windowed" )

       .groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with 
maximum granularity, which is
    per-partition new KeyValue<>(
           new Windowed<>(
             windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,// 
KIP-159 would come in handy here, to access partition number
    instead
             windowedCustomerId.window()// will use this in the interactive 
queries to pick the oldest
    not-yet-expired window
           ),
           timeAndAggs.aggs()
         ),
         new SessionKeySerde<>(Serdes.Integer()),
    /* aggregate serde */
       )

       .reduce(
         (val, agg) -> agg.add(val),
         (val, agg) -> agg.subtract(val),
         txTotalsStore()// this store can be queried to get totals per 
partition for all
    active sessions );

    builder.globalTable(
       new SessionKeySerde<>(Serdes.Integer()),
       /* aggregate serde */,
       changelogTopicForStore(TRANSACTION_TOTALS),"totals");
    // this global table puts per partition totals on every node, so
    that they can be easily summed for global totals, picking the
    oldest not-yet-expired window

    TODO: put in StreamParitioners (with KTable.through variants added
    in KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.

    The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I
    want to first do summation with max parallelism and minimize the
    work needed downstream. So I calculate a per-partition sum first
    to limit the updates that the totals topic will receive and the
    summing work done by the interactive queries on the global store.
    Is this a good way of going about it?

    Thanks,

    MichaƂ


    On 09/05/17 18:31, Matthias J. Sax wrote:
    Hi,

    I started a new Wiki page to collect some common usage patterns for
    Kafka Streams.

    Right now, it contains a quick example on "how to compute average". Hope
    we can collect more example like this!

    
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
    
<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns>


    -Matthias


-- <http://www.openbet.com/> Michal Borowiecki
    Senior Software Engineer L4
        T:      +44 208 742 1600 <tel:+44%2020%208742%201600>

        
        +44 203 249 8448 <tel:+44%2020%203249%208448>

        
        
        E:      michal.borowie...@openbet.com
    <mailto:michal.borowie...@openbet.com>
        W:      www.openbet.com <http://www.openbet.com/>

        
        OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK

        
    <https://www.openbet.com/email_promo>

    This message is confidential and intended only for the addressee.
    If you have received this message in error, please immediately
    notify the postmas...@openbet.com <mailto:postmas...@openbet.com>
    and delete it from your system as well as any copies. The content
    of e-mails as well as traffic data may be monitored by OpenBet for
    employment and security purposes. To protect the environment
    please do not print this e-mail unless necessary. OpenBet Ltd.
    Registered Office: Chiswick Park Building 9, 566 Chiswick High
    Road, London, W4 5XT, United Kingdom. A company registered in
    England and Wales. Registered no. 3134634. VAT no. GB927523612



--
Signature
<http://www.openbet.com/>         Michal Borowiecki
Senior Software Engineer L4
        T:      +44 208 742 1600

        
        +44 203 249 8448

        
        
        E:      michal.borowie...@openbet.com
        W:      www.openbet.com <http://www.openbet.com/>

        
        OpenBet Ltd

        Chiswick Park Building 9

        566 Chiswick High Rd

        London

        W4 5XT

        UK

        
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612

Reply via email to