the previous link didn't work, https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams
On Tue, Apr 22, 2014 at 10:30 AM, Raphael Hsieh <raffihs...@gmail.com>wrote: > Yes partially, > The part I was missing was getting old values and feeding it through the > aggregator again, which still doesn't quite make sense to me. > > I am using an external datastore, so I am not able to use the vanilla > MemcachedState, hence why I am implementing my own version of the > IBackingMap. > > So let me try and explain what I am understanding. > When I do something like > > Stream > .groupBy(new Fields("a") > .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c", > "d"), new MyAggregator(), new Fields("resultMap")) > > What happens (as described > here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>) > is the stream is split into different groups based on field "a": > [image: Grouping] > like so. > then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c", > "d"), since that is what we are using as our keys. So in each of the > "groups" described above, we would have not only the raw tuples resulting > from the grouping, but also a single tuple with the result of the previous > aggregation. > These would all be run through the aggregator, which should be able to > handle aggregating with this semi-complete aggregation (The "Reduce" > function in a ReducerAggregator, or the "Combine" function in the > CombinerAggregator). > > How does it know not to treat the previous aggregation as a single new > tuple? (hence not running the "init" function ? For example if I was > aggregating a count, having that previous value (say 60) as a single extra > tuple would only increment the count by 1, instead of 60. > would I then just need to implement my own "init" function such that it > has checks for the tuple value, whether it is a raw new tuple, vs a > previous tuple aggregation? > > > On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <cody.a....@gmail.com> wrote: > >> My understanding is that the process is >> 1. multiGet from the IBackingMap is called and returns a value for each >> key (or null if not present) >> 2. For each key, the old value from the get and new values in the batch >> are fed through the aggregator to produce one value per key >> 3. This value is then stored back into the state through the multiPut in >> the IBackingMap. >> >> If you just want to use nathanmarz's trident-memcached integration, you >> don't have to write an IBackingMap yourself. The MemcachedState itself >> implements IBackingMap to do the get and put. To use it, just decide what >> you want to groupBy (these become your keys) and how you want it aggregated >> (this is the reduced/combiner implementation). You don't have to write the >> memcache connection logic or the aggregation logic yourself unless you want >> to change how it's aggregated or stored. >> I've not used the trident-memcached state in particular, but in general >> this would look something like this: >> >> topology.newStream("spout1", spout1) >> .groupBy(new Fields("mykeyfield")) >> .persistentAggregate(MemcachedState.opaque(servers), new >> Fields("myvaluefield"), new Sum(), new Fields("sum")) >> >> (Sorry for any code errors; writing in my phone) >> >> Does that answer your question? >> >> -Cody >> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <raffihs...@gmail.com> wrote: >> >>> The Reducer/Combiner Aggregators hold logic in order to aggregate across >>> an entire batch, however it does not have the logic to aggregate between >>> batches. >>> In order for this to happen, it must read the previous TransactionId and >>> value from the datastore, determine whether this incoming data is in the >>> right sequence, then increment the value within the datastore. >>> >>> I am asking about this second part. Where the logic goes in order to >>> read previous data from the datastore, and add it to the new incoming >>> aggregate data. >>> >>> >>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <cody.a....@gmail.com>wrote: >>> >>>> Its the ReducerAggregate/CombinerAggregator's job to implement this >>>> logic. Look at Count and Sum that are built-in to Trident. You can also >>>> implement your own aggregator. >>>> >>>> -Cody >>>> >>>> >>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <raffihs...@gmail.com>wrote: >>>> >>>>> If I am using an opaque spout and doing a persistent aggregate to a >>>>> MemcachedState, how is it aggregating/incrementing the values across all >>>>> batches ? >>>>> >>>>> I'm wanting to implement an IBackingMap so that I can use an external >>>>> datastore. However, I'm unsure where the logic goes that will read the >>>>> previous data, and aggregate it with the new data. >>>>> >>>>> From what I've been told, I need to implement the IBackingMap and the >>>>> multiput/multiget functions. So logically, I think it makes sense that I >>>>> would put this update logiv in the multiput function. However, the >>>>> OpaqueMap class already has multiGet logic in order to check the TxId of >>>>> the batch. >>>>> Instead of using an OpaqueMap class, should I just make my own >>>>> implementation ? >>>>> >>>>> Thanks >>>>> -- >>>>> Raphael Hsieh >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> Cody A. Ray, LEED AP >>>> cody.a....@gmail.com >>>> 215.501.7891 >>>> >>> >>> >>> >>> -- >>> Raphael Hsieh >>> >>> >>> >>> >> > > > -- > Raphael Hsieh > > > > -- Raphael Hsieh