Are you saying that there is no purpose to do a "groupBy" followed by a PersistentAggregate ? The documentation states: "If you run aggregators on a grouped stream, the aggregation will be run within each group instead of against the whole batch."
On Wed, Apr 23, 2014 at 2:17 AM, Danijel Schiavuzzi <dani...@schiavuzzi.com>wrote: > > When I do something like >> >> Stream >> .groupBy(new Fields("a") >> .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c", >> "d"), new MyAggregator(), new Fields("resultMap")) >> >> What happens (as described >> here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>) >> is the stream is split into different groups based on field "a": >> > > This is not true. The stream will be grouped based on all the keys you > specified in persistentAggregate, i.e. new Fields("a", "b", "c", "d"). This > will produce as many GroupedStreams as there are distinct groupings among > those keys. Those groupings will then be combined/reduced with the existing > values gathered from the IBackingMap#multiGet(), and Trident will then call > multiPut()) to persist the updated aggregations back to the underlying data > store. > > Take a look at the Storm sources under the package "storm.trident.*". A > good starting point for understanding Trident would be the Java class > "storm.trident.state.map.TransactionalMap" (or OpaqueMap or > NonTransactionalMap). > > Danijel Schiavuzzi > www.schiavuzzi.com > > > > > >> [image: Grouping] >> like so. >> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c", >> "d"), since that is what we are using as our keys. So in each of the >> "groups" described above, we would have not only the raw tuples resulting >> from the grouping, but also a single tuple with the result of the previous >> aggregation. >> These would all be run through the aggregator, which should be able to >> handle aggregating with this semi-complete aggregation (The "Reduce" >> function in a ReducerAggregator, or the "Combine" function in the >> CombinerAggregator). >> >> How does it know not to treat the previous aggregation as a single new >> tuple? (hence not running the "init" function ? For example if I was >> aggregating a count, having that previous value (say 60) as a single extra >> tuple would only increment the count by 1, instead of 60. >> would I then just need to implement my own "init" function such that it >> has checks for the tuple value, whether it is a raw new tuple, vs a >> previous tuple aggregation? >> >> >> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <cody.a....@gmail.com>wrote: >> >>> My understanding is that the process is >>> 1. multiGet from the IBackingMap is called and returns a value for each >>> key (or null if not present) >>> 2. For each key, the old value from the get and new values in the batch >>> are fed through the aggregator to produce one value per key >>> 3. This value is then stored back into the state through the multiPut in >>> the IBackingMap. >>> >>> If you just want to use nathanmarz's trident-memcached integration, you >>> don't have to write an IBackingMap yourself. The MemcachedState itself >>> implements IBackingMap to do the get and put. To use it, just decide what >>> you want to groupBy (these become your keys) and how you want it aggregated >>> (this is the reduced/combiner implementation). You don't have to write the >>> memcache connection logic or the aggregation logic yourself unless you want >>> to change how it's aggregated or stored. >>> I've not used the trident-memcached state in particular, but in general >>> this would look something like this: >>> >>> topology.newStream("spout1", spout1) >>> .groupBy(new Fields("mykeyfield")) >>> .persistentAggregate(MemcachedState.opaque(servers), new >>> Fields("myvaluefield"), new Sum(), new Fields("sum")) >>> >>> (Sorry for any code errors; writing in my phone) >>> >>> Does that answer your question? >>> >>> -Cody >>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <raffihs...@gmail.com> wrote: >>> >>>> The Reducer/Combiner Aggregators hold logic in order to aggregate >>>> across an entire batch, however it does not have the logic to aggregate >>>> between batches. >>>> In order for this to happen, it must read the previous TransactionId >>>> and value from the datastore, determine whether this incoming data is in >>>> the right sequence, then increment the value within the datastore. >>>> >>>> I am asking about this second part. Where the logic goes in order to >>>> read previous data from the datastore, and add it to the new incoming >>>> aggregate data. >>>> >>>> >>>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <cody.a....@gmail.com>wrote: >>>> >>>>> Its the ReducerAggregate/CombinerAggregator's job to implement this >>>>> logic. Look at Count and Sum that are built-in to Trident. You can also >>>>> implement your own aggregator. >>>>> >>>>> -Cody >>>>> >>>>> >>>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh >>>>> <raffihs...@gmail.com>wrote: >>>>> >>>>>> If I am using an opaque spout and doing a persistent aggregate to a >>>>>> MemcachedState, how is it aggregating/incrementing the values across all >>>>>> batches ? >>>>>> >>>>>> I'm wanting to implement an IBackingMap so that I can use an external >>>>>> datastore. However, I'm unsure where the logic goes that will read the >>>>>> previous data, and aggregate it with the new data. >>>>>> >>>>>> From what I've been told, I need to implement the IBackingMap and the >>>>>> multiput/multiget functions. So logically, I think it makes sense that I >>>>>> would put this update logiv in the multiput function. However, the >>>>>> OpaqueMap class already has multiGet logic in order to check the TxId of >>>>>> the batch. >>>>>> Instead of using an OpaqueMap class, should I just make my own >>>>>> implementation ? >>>>>> >>>>>> Thanks >>>>>> -- >>>>>> Raphael Hsieh >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Cody A. Ray, LEED AP >>>>> cody.a....@gmail.com >>>>> 215.501.7891 >>>>> >>>> >>>> >>>> >>>> -- >>>> Raphael Hsieh >>>> >>>> >>>> >>>> >>> >> >> >> -- >> Raphael Hsieh >> >> >> >> > > > > -- > Danijel Schiavuzzi > > E: dani...@schiavuzzi.com > W: www.schiavuzzi.com > T: +385989035562 > Skype: danijels7 > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014