Hi all,
I've updated the wiki page with a draft pattern for consecutively
growing time-windowed aggregations which was discussed some time ago on
this mailing list.
I'm yet to add the part that cleans up the stores using punctuations.
Stay tuned.
On a somewhat similar subject, I've been working to implement the
following requirements:
* transaction sums per customer session (simple, just extract
non-expired session-windowed aggregates from a SessionStore using
interactive queries)
* global transaction sums for all _/currently active/_ customer sessions
The second bit proved non-trivial, because session-windowed KTables (or
any windowed KTables for that matter) don't notify downstream when a
window expires. And I can't use punctuate until KIP-138 is implemented
because stream time punctuation is no good in this case (records can
stop coming), reliable system time punctuation would be needed.
Below is how I implemented this, I'm yet to test it thoroughly.
I wonder if anyone knows of an easier way of achieving the same.
If so, I'm looking forward to suggestions. If not, I'll add that to the
patterns wiki page too, in case someone else finds it useful.
builder
.stream(/*key serde*/, /*transaction serde*/,"transaciton-topic")
.groupByKey(/*key serde*/, /*transaction serde*/)
.aggregate(
() -> /*empty aggregate*/,
aggregator(),
merger(),
SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
/* aggregate serde */,
txPerCustomerSumStore()// this store can be queried for per customer
session data )
.toStream()
.filter(((key, value) -> value !=null))// tombstones only come when a session is merged into a bigger session,
so ignore them
// the below map/groupByKey/reduce operations are to only propagate
updates to the _latest_ session per customer to downstream
.map((windowedCustomerId, agg) ->// this moves timestamp from the windowed key into the value // so that
we can group by customerId only and reduce to the later value new KeyValue<>(
windowedCustomerId.key(),// just customerId new WindowedAggsImpl(// this is just like a tuple2 but with nicely named accessors:
timestamp() and aggs()
windowedCustomerId.window().end(),
agg
)
)
)
.groupByKey( /*key serde*/, /*windowed aggs serde*/ )// key is just customerId .reduce(// take later session value and ignore any older - downstream only cares
about _current_ sessions (val, agg) -> val.timestamp() > agg.timestamp() ? val : agg,
TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
"latest-session-windowed" )
.groupBy((windowedCustomerId, timeAndAggs) ->// calculate totals with maximum
granularity, which is per-partition new KeyValue<>(
new Windowed<>(
windowedCustomerId.key().hashCode() %PARTITION_COUNT_FOR_TOTALS,//
KIP-159 would come in handy here, to access partition number instead
windowedCustomerId.window()// will use this in the interactive queries to pick the oldest
not-yet-expired window
),
timeAndAggs.aggs()
),
new SessionKeySerde<>(Serdes.Integer()),
/* aggregate serde */
)
.reduce(
(val, agg) -> agg.add(val),
(val, agg) -> agg.subtract(val),
txTotalsStore()// this store can be queried to get totals per partition for all active
sessions );
builder.globalTable(
new SessionKeySerde<>(Serdes.Integer()),
/* aggregate serde */,
changelogTopicForStore(TRANSACTION_TOTALS),"totals");
// this global table puts per partition totals on every node, so that
they can be easily summed for global totals, picking the oldest
not-yet-expired window
TODO: put in StreamParitioners (with KTable.through variants added in
KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.
The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to
first do summation with max parallelism and minimize the work needed
downstream. So I calculate a per-partition sum first to limit the
updates that the totals topic will receive and the summing work done by
the interactive queries on the global store. Is this a good way of going
about it?
Thanks,
MichaĆ
On 09/05/17 18:31, Matthias J. Sax wrote:
Hi,
I started a new Wiki page to collect some common usage patterns for
Kafka Streams.
Right now, it contains a quick example on "how to compute average". Hope
we can collect more example like this!
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
-Matthias
--
Signature
<http://www.openbet.com/> Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600
+44 203 249 8448
E: michal.borowie...@openbet.com
W: www.openbet.com <http://www.openbet.com/>
OpenBet Ltd
Chiswick Park Building 9
566 Chiswick High Rd
London
W4 5XT
UK
<https://www.openbet.com/email_promo>
This message is confidential and intended only for the addressee. If you
have received this message in error, please immediately notify the
postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
from your system as well as any copies. The content of e-mails as well
as traffic data may be monitored by OpenBet for employment and security
purposes. To protect the environment please do not print this e-mail
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
registered in England and Wales. Registered no. 3134634. VAT no.
GB927523612