Hi Matti,

Use a persistent aggregate, it's doing precisely what you are describing:
giving the result of an Aggregator to a Trident State so we can save it
somewhere based on the groupby "bucket" it belongs to.

Here's a blog post where I explain my understanding of how it works:
http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/

Cheers,

Svend








On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom <mdahlbom...@gmail.com>wrote:

> Hello, a beginner question coming up. I'm trying to build analytics
> crunching with Storm Trident; a continuous stream of events of which I need
> to group/aggregate things and then write the aggregated results over a
> time-slice into a database for quick access later on. I am starting with
> the following topology:
>
>         TridentState state = topology.newStream("logspout", spout)
>             .parallelismHint(8).each(new Fields("json"),
>                                      new ProcessJsonFunction(),
>                                      new Fields("ad_id", "zone",
>                                                 "impressions", "clicks"))
>             .groupBy(new Fields("ad_id", "zone"))
>             .chainedAgg()
>             .aggregate(new Fields("impressions"), new Sum(),
>                        new Fields("impressions_sum"))
>             .aggregate(new Fields("clicks"), new Sum(),
>                        new Fields("clicks_sum"))
>             .chainEnd()
>             .partitionPersist(new AnalyticsStateFactory(),
>                               new Fields("impressions_sum", "clicks_sum"),
>                               new AnalyticsStateUpdater());
>
> And then in my class AnalyticsStateUpdater:s method updateState() I would
> like to store the aggregated values ("impressions_sum", "clicks_sum") per
> key-"bucket" -- and here I ran into problems; how do I - in the
> StateUpdater - know to which groupBy() "bucket" the aggregated data belongs
> to? In other words I would need to get the key formed of the values of the
> fields ("ad_id", "zone"). The aggregated values themselves end up properly
> Sum():ed in the StateUpdater.
>
> I am aware this is probably trivial but from the Trident documentation (or
> the lack of it) I cannot seem to figure out how to do this.
>
> BR,
>
> - Matti
>

Reply via email to