Hi Matti, Use a persistent aggregate, it's doing precisely what you are describing: giving the result of an Aggregator to a Trident State so we can save it somewhere based on the groupby "bucket" it belongs to.
Here's a blog post where I explain my understanding of how it works: http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/ Cheers, Svend On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom <mdahlbom...@gmail.com>wrote: > Hello, a beginner question coming up. I'm trying to build analytics > crunching with Storm Trident; a continuous stream of events of which I need > to group/aggregate things and then write the aggregated results over a > time-slice into a database for quick access later on. I am starting with > the following topology: > > TridentState state = topology.newStream("logspout", spout) > .parallelismHint(8).each(new Fields("json"), > new ProcessJsonFunction(), > new Fields("ad_id", "zone", > "impressions", "clicks")) > .groupBy(new Fields("ad_id", "zone")) > .chainedAgg() > .aggregate(new Fields("impressions"), new Sum(), > new Fields("impressions_sum")) > .aggregate(new Fields("clicks"), new Sum(), > new Fields("clicks_sum")) > .chainEnd() > .partitionPersist(new AnalyticsStateFactory(), > new Fields("impressions_sum", "clicks_sum"), > new AnalyticsStateUpdater()); > > And then in my class AnalyticsStateUpdater:s method updateState() I would > like to store the aggregated values ("impressions_sum", "clicks_sum") per > key-"bucket" -- and here I ran into problems; how do I - in the > StateUpdater - know to which groupBy() "bucket" the aggregated data belongs > to? In other words I would need to get the key formed of the values of the > fields ("ad_id", "zone"). The aggregated values themselves end up properly > Sum():ed in the StateUpdater. > > I am aware this is probably trivial but from the Trident documentation (or > the lack of it) I cannot seem to figure out how to do this. > > BR, > > - Matti >