> When I do something like
>
> Stream
>     .groupBy(new Fields("a")
>     .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
> "d"), new MyAggregator(), new Fields("resultMap"))
>
> What happens (as described 
> here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
> is the stream is split into different groups based on field "a":
>

This is not true. The stream will be grouped based on all the keys you
specified in persistentAggregate, i.e. new Fields("a", "b", "c", "d"). This
will produce as many GroupedStreams as there are distinct groupings among
those keys. Those groupings will then be combined/reduced with the existing
values gathered from the IBackingMap#multiGet(), and Trident will then call
multiPut()) to persist the updated aggregations back to the underlying data
store.

Take a look at the Storm sources under the package "storm.trident.*". A
good starting point for understanding Trident would be the Java class
"storm.trident.state.map.TransactionalMap" (or OpaqueMap or
NonTransactionalMap).

Danijel Schiavuzzi
www.schiavuzzi.com





> [image: Grouping]
> like so.
> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
> "d"), since that is what we are using as our keys. So in each of the
> "groups" described above, we would have not only the raw tuples resulting
> from the grouping, but also a single tuple with the result of the previous
> aggregation.
> These would all be run through the aggregator, which should be able to
> handle aggregating with this semi-complete aggregation (The "Reduce"
> function in a ReducerAggregator, or the "Combine" function in the
> CombinerAggregator).
>
> How does it know not to treat the previous aggregation as a single new
> tuple? (hence not running the "init" function ? For example if I was
> aggregating a count, having that previous value (say 60) as a single extra
> tuple would only increment the count by 1, instead of 60.
> would I then just need to implement my own "init" function such that it
> has checks for the tuple  value, whether it is a raw new tuple, vs a
> previous tuple aggregation?
>
>
> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray <cody.a....@gmail.com> wrote:
>
>> My understanding is that the process is
>> 1. multiGet from the IBackingMap  is called and returns a value for each
>> key (or null if not present)
>> 2. For each key, the old value from the get and new values in the batch
>> are fed through the aggregator to produce one value per key
>> 3. This value is then stored back into the state through the multiPut in
>> the IBackingMap.
>>
>> If you just want to use nathanmarz's trident-memcached integration, you
>> don't have to write an IBackingMap yourself. The MemcachedState itself
>> implements IBackingMap to do the get and put. To use it, just decide what
>> you want to groupBy (these become your keys) and how you want it aggregated
>> (this is the reduced/combiner implementation). You don't have to write the
>> memcache connection logic or the aggregation logic yourself unless you want
>> to change how it's aggregated or stored.
>> I've not used the trident-memcached state in particular, but in general
>> this would look something like this:
>>
>> topology.newStream("spout1", spout1)
>>   .groupBy(new Fields("mykeyfield"))
>>   .persistentAggregate(MemcachedState.opaque(servers), new
>> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>>
>> (Sorry for any code errors; writing in my phone)
>>
>> Does that answer your question?
>>
>> -Cody
>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" <raffihs...@gmail.com> wrote:
>>
>>> The Reducer/Combiner Aggregators hold logic in order to aggregate across
>>> an entire batch, however it does not have the logic to aggregate between
>>> batches.
>>> In order for this to happen, it must read the previous TransactionId and
>>> value from the datastore, determine whether this incoming data is in the
>>> right sequence, then increment the value within the datastore.
>>>
>>> I am asking about this second part. Where the logic goes in order to
>>> read previous data from the datastore, and add it to the new incoming
>>> aggregate data.
>>>
>>>
>>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray <cody.a....@gmail.com>wrote:
>>>
>>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>>> implement your own aggregator.
>>>>
>>>> -Cody
>>>>
>>>>
>>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh <raffihs...@gmail.com>wrote:
>>>>
>>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>>> batches ?
>>>>>
>>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>>> previous data, and aggregate it with the new data.
>>>>>
>>>>> From what I've been told, I need to implement the IBackingMap and the
>>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>>> would put this update logiv in the multiput function. However, the
>>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>>> the batch.
>>>>> Instead of using an OpaqueMap class, should I just make my own
>>>>> implementation ?
>>>>>
>>>>> Thanks
>>>>> --
>>>>> Raphael Hsieh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cody A. Ray, LEED AP
>>>> cody.a....@gmail.com
>>>> 215.501.7891
>>>>
>>>
>>>
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>
>
> --
> Raphael Hsieh
>
>
>
>



-- 
Danijel Schiavuzzi

E: dani...@schiavuzzi.com
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Reply via email to