Are you saying that there is no purpose to do a "groupBy" followed by a
PersistentAggregate ? The documentation states: "If you run aggregators on
a grouped stream, the aggregation will be run within each group instead of
against the whole batch."


On Wed, Apr 23, 2014 at 2:17 AM, Danijel Schiavuzzi
<dani...@schiavuzzi.com>wrote:

>
> When I do something like
>>
>> Stream
>>     .groupBy(new Fields("a")
>>     .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
>> "d"), new MyAggregator(), new Fields("resultMap"))
>>
>> What happens (as described 
>> here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
>> is the stream is split into different groups based on field "a":
>>
>
> This is not true. The stream will be grouped based on all the keys you
> specified in persistentAggregate, i.e. new Fields("a", "b", "c", "d"). This
> will produce as many GroupedStreams as there are distinct groupings among
> those keys. Those groupings will then be combined/reduced with the existing
> values gathered from the IBackingMap#multiGet(), and Trident will then call
> multiPut()) to persist the updated aggregations back to the underlying data
> store.
>
> Take a look at the Storm sources under the package "storm.trident.*". A
> good starting point for understanding Trident would be the Java class
> "storm.trident.state.map.TransactionalMap" (or OpaqueMap or
> NonTransactionalMap).
>
> Danijel Schiavuzzi
> www.schiavuzzi.com
>
>
>
>
>
>> [image: Grouping]
>> like so.
>> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
>> "d"), since that is what we are using as our keys. So in each of the
>> "groups" described above, we would have not only the raw tuples resulting
>> from the grouping, but also a single tuple with the result of the previous
>> aggregation.
>> These would all be run through the aggregator, which should be able to
>> handle aggregating with this semi-complete aggregation (The "Reduce"
>> function in a ReducerAggregator, or the "Combine" function in the
>> CombinerAggregator).
>>
>> How does it know not to treat the previous aggregation as a single new
>> tuple? (hence not running the "init" function ? For example if I was
>> aggregating a count, having that previous value (say 60) as a single extra
>> tuple would only increment the count by 1, instead of 60.
>> would I then just need to implement my own "init" function such that it
>> has checks for the tuple  value, whether it is a raw new tuple, vs a
>> previous tuple aggregation?
>>
>>
>> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <cody.a....@gmail.com>wrote:
>>
>>> My understanding is that the process is
>>> 1. multiGet from the IBackingMap  is called and returns a value for each
>>> key (or null if not present)
>>> 2. For each key, the old value from the get and new values in the batch
>>> are fed through the aggregator to produce one value per key
>>> 3. This value is then stored back into the state through the multiPut in
>>> the IBackingMap.
>>>
>>> If you just want to use nathanmarz's trident-memcached integration, you
>>> don't have to write an IBackingMap yourself. The MemcachedState itself
>>> implements IBackingMap to do the get and put. To use it, just decide what
>>> you want to groupBy (these become your keys) and how you want it aggregated
>>> (this is the reduced/combiner implementation). You don't have to write the
>>> memcache connection logic or the aggregation logic yourself unless you want
>>> to change how it's aggregated or stored.
>>> I've not used the trident-memcached state in particular, but in general
>>> this would look something like this:
>>>
>>> topology.newStream("spout1", spout1)
>>>   .groupBy(new Fields("mykeyfield"))
>>>   .persistentAggregate(MemcachedState.opaque(servers), new
>>> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>>>
>>> (Sorry for any code errors; writing in my phone)
>>>
>>> Does that answer your question?
>>>
>>> -Cody
>>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <raffihs...@gmail.com> wrote:
>>>
>>>> The Reducer/Combiner Aggregators hold logic in order to aggregate
>>>> across an entire batch, however it does not have the logic to aggregate
>>>> between batches.
>>>> In order for this to happen, it must read the previous TransactionId
>>>> and value from the datastore, determine whether this incoming data is in
>>>> the right sequence, then increment the value within the datastore.
>>>>
>>>> I am asking about this second part. Where the logic goes in order to
>>>> read previous data from the datastore, and add it to the new incoming
>>>> aggregate data.
>>>>
>>>>
>>>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <cody.a....@gmail.com>wrote:
>>>>
>>>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>>>> implement your own aggregator.
>>>>>
>>>>> -Cody
>>>>>
>>>>>
>>>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh 
>>>>> <raffihs...@gmail.com>wrote:
>>>>>
>>>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>>>> batches ?
>>>>>>
>>>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>>>> previous data, and aggregate it with the new data.
>>>>>>
>>>>>> From what I've been told, I need to implement the IBackingMap and the
>>>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>>>> would put this update logiv in the multiput function. However, the
>>>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>>>> the batch.
>>>>>> Instead of using an OpaqueMap class, should I just make my own
>>>>>> implementation ?
>>>>>>
>>>>>> Thanks
>>>>>> --
>>>>>> Raphael Hsieh
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cody A. Ray, LEED AP
>>>>> cody.a....@gmail.com
>>>>> 215.501.7891
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Danijel Schiavuzzi
>
> E: dani...@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014

Reply via email to