Hi Lukasz, Kenneth,

Thanks a lot for your follow-up.
@Lukasz: I was indeed referring to that SerializableFunction. You are right
that they don't have to be transferred to one worker, that was what I was
trying to solve using the ArrayLists (and those turn out to be empty,
although the if-condition is true - it works when running it locally!). I
implemented it with a GroupByKey and DoFn and that's working now (- I think
that's similar to your proposed solution, just having more code?) :) But
since I think the elements could be processed in parallel on different
workers I would still like to try doing it with an accumulator.
@Kenneth: thanks for your comment. Are you suggesting that using
 Combine.perKey(SerializableFunction<Iterable<V>, V>) will not actually
shuffle everything to one node prior to combining? Then this approach might
not lead to the right results after all... How does the
SerializableFunction then combine the different values within one node;
using the same function? (then it basically replaces both addInputs and
mergeAccumulators?)

Thanks a lot!

Best,

Matthias

On Tue, Nov 22, 2016 at 6:07 PM, Kenneth Knowles <[email protected]> wrote:

> One minor correction I wanted to note: when you use
>
>     Combine.perKey(SerializableFunction<Iterable<V>, V>)
>
> the SerializableFunction plays roughly the same role as mergeAccumulators
> (and the Combine  is implemented just so) , so it requires that the input
> type V is the accumulator type, as with aggregations like Sum. In fact,
> this does allow a runner to combine prior to the shuffle.
>
> On Nov 22, 2016 06:57, "Lukasz Cwik" <[email protected]> wrote:
>
>> Is this ever actually true? "Arrays.asList(hits).contains(full_list[i])"
>> Its unclear if full_list is empty or not, if its empty then your
>> Accum.log is empty.
>>
>> When you were asking about the SerializableFunction, were you
>> specifically referring to one which you pass into Combine such as
>> Combine.perKey (http://beam.incubator.apache.
>> org/documentation/sdks/javadoc/0.3.0-incubating/org/apache/b
>> eam/sdk/transforms/Combine.html#perKey-org.apache.beam.sdk.
>> transforms.SerializableFunction-)?
>>
>> If so, then it is not true that the all the values for a single key need
>> to be shipped to one worker. For example, if you had four values (A0, A1,
>> A2, A3), one worker could combine A0 and A1 producing A01 and another
>> worker could combine A2 and A3 producing A23 with a third worker combining
>> A01 and A23 producing the final result A0123.
>>
>> Since your specifically trying to combine the same input as the output
>> type, I would stick with the simpler SerializableFunction implementation as
>> you get all the performance benefits with a much simpler implementation
>> requirement.
>>
>>
>> On Tue, Nov 22, 2016 at 9:37 AM, Matthias Baetens <
>> [email protected]> wrote:
>>
>>> Hi Lukasz,
>>> Hi all,
>>>
>>> Thanks for your message.
>>> The code in my previous e-mail specifies the functions I have defined
>>> for my accumulator, which is basically adding to an ArrayList and then
>>> merging into one ArrayList. I think I have implemented all the functions
>>> necessary, as specified in the docs. However, when writing the results, the
>>> ArrayLists seem to be empty. When loggingm only the logs from the 
>>> *addInputs()
>>> *function get written.
>>> Do you have any idea why this approach would not work for ArrayList and
>>> how to solve it (or if there are any alternatives)?
>>>
>>> Cheers!
>>>
>>> Matthias
>>>
>>> On Tue, Nov 22, 2016 at 2:14 PM, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> The javadoc goes through a lengthy explanation:
>>>> http://beam.incubator.apache.org/documentation/sdks/javadoc/
>>>> 0.3.0-incubating/org/apache/beam/sdk/transforms/Combine.CombineFn.html
>>>>
>>>> A CombineFn<InputT, AccumT, OutputT> specifies how to combine a
>>>> collection of input values of type InputT into a single output value of
>>>> type OutputT. It does this via one or more intermediate mutable accumulator
>>>> values of type AccumT.
>>>> The overall process to combine a collection of input InputT values into
>>>> a single output OutputT value is as follows:
>>>>
>>>> The input InputT values are partitioned into one or more batches.
>>>> 1) For each batch, the createAccumulator() operation is invoked to
>>>> create a fresh mutable accumulator value of type AccumT, initialized to
>>>> represent the combination of zero values.
>>>> 2) For each input InputT value in a batch, the addInput(AccumT, InputT)
>>>> operation is invoked to add the value to that batch's accumulator AccumT
>>>> value. The accumulator may just record the new value (e.g., if AccumT ==
>>>> List<InputT>, or may do work to represent the combination more compactly.
>>>> 3) The mergeAccumulators(java.lang.Iterable<AccumT>) operation is
>>>> invoked to combine a collection of accumulator AccumT values into a single
>>>> combined output accumulator AccumT value, once the merging accumulators
>>>> have had all all the input values in their batches added to them. This
>>>> operation is invoked repeatedly, until there is only one accumulator value
>>>> left.
>>>> 4) The extractOutput(AccumT) operation is invoked on the final
>>>> accumulator AccumT value to get the output OutputT value.
>>>> For example:
>>>>
>>>>
>>>>  public class AverageFn extends CombineFn<Integer, AverageFn.Accum,
>>>> Double> {
>>>>    public static class Accum {
>>>>      int sum = 0;
>>>>      int count = 0;
>>>>    }
>>>>    public Accum createAccumulator() {
>>>>      return new Accum();
>>>>    }
>>>>    public Accum addInput(Accum accum, Integer input) {
>>>>        accum.sum += input;
>>>>        accum.count++;
>>>>        return accum;
>>>>    }
>>>>    public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>>      Accum merged = createAccumulator();
>>>>      for (Accum accum : accums) {
>>>>        merged.sum += accum.sum;
>>>>        merged.count += accum.count;
>>>>      }
>>>>      return merged;
>>>>    }
>>>>    public Double extractOutput(Accum accum) {
>>>>      return ((double) accum.sum) / accum.count;
>>>>    }
>>>>  }
>>>>  PCollection<Integer> pc = ...;
>>>>  PCollection<Double> average = pc.apply(Combine.globally(new
>>>> AverageFn()));
>>>>
>>>> Combining functions used by Combine.Globally, Combine.PerKey,
>>>> Combine.GroupedValues, and PTransforms derived from them should be
>>>> associative and commutative. Associativity is required because input values
>>>> are first broken up into subgroups before being combined, and their
>>>> intermediate results further combined, in an arbitrary tree structure.
>>>> Commutativity is required because any order of the input values is ignored
>>>> when breaking up input values into groups.
>>>>
>>>> On Tue, Nov 22, 2016 at 7:11 AM, Matthias Baetens <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Aljoscha,
>>>>>
>>>>> Thank you very much for the clear answer, it solves my problem (since
>>>>> I now know I should use a SerializableFunction when wanting to have a full
>>>>> view of the data for one key).
>>>>>
>>>>> For the second question, the accumulator that I wrote appends elements
>>>>> to the ArrayList of a custom object in case it's not in the list yet:
>>>>>
>>>>> @DefaultCoder(AvroCoder.class)
>>>>> public static class Accum {
>>>>> Log log = null;
>>>>> }
>>>>> @Override
>>>>> public Accum createAccumulator() { return new Accum(); }
>>>>> @Override
>>>>> public Accum addInput(Accum accum, Log input) {
>>>>> if(accum.log == null) {
>>>>> accum.log = new Log(input);
>>>>> }
>>>>> String[] hits = input.getHits_appInfo_screenName().split("/");
>>>>> for(int i = 0; i < full_list.length; i++) {
>>>>> if(Arrays.asList(hits).contains(full_list[i]) &&
>>>>> !accum.log.getSparseArray().contains(i))
>>>>> accum.log.getSparseArray().add(i);
>>>>> }
>>>>> return accum;
>>>>> }
>>>>> @Override
>>>>> public Accum mergeAccumulators(Iterable<Accum> accums) {
>>>>> Accum merged = createAccumulator();
>>>>> for(Accum accum : accums) {
>>>>> if(merged.log == null){
>>>>> merged.log = new Log(accum.log);
>>>>> }
>>>>> else {
>>>>> for(Integer index : accum.log.getSparseArray()) {
>>>>> if(!merged.log.getSparseArray().contains(index))
>>>>> merged.log.getSparseArray().add(index);
>>>>> }
>>>>> }
>>>>> }
>>>>> return merged;
>>>>> }
>>>>> @Override
>>>>> public Log extractOutput(Accum accum) {
>>>>> return accum.log;
>>>>> }
>>>>>
>>>>> Hope this helps for answering the second question!
>>>>>
>>>>> Thanks again :)
>>>>>
>>>>> Best,
>>>>>
>>>>> Matthias
>>>>>
>>>>> On Tue, Nov 22, 2016 at 11:47 AM, Aljoscha Krettek <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Matthias,
>>>>>> when the shuffle happens is not defined by the Beam model so it
>>>>>> depends on the runner. You are right, though, that a runner can optimise
>>>>>> execution when you specify a CombineFn. In that case a runner can choose 
>>>>>> to
>>>>>> combine elements before shuffling to reduce the amount of data that we 
>>>>>> have
>>>>>> to shuffle across the network. With a SerializableFunction that's not
>>>>>> possible because we don't have an intermediate accumulation type as we 
>>>>>> have
>>>>>> with a CombineFn. Therefore, the runner has to ship all elements for a
>>>>>> given key to one machine to apply the SerializableFunction.
>>>>>>
>>>>>> Regarding your second question, could you maybe send a code snipped?
>>>>>> That would allow us to have a look and give a good answer.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Tue, 22 Nov 2016 at 12:32 Matthias Baetens <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> I had some questions about the internal working of these two
>>>>>>> concepts and where I could find more info on this (so I might be able
>>>>>>> similar problems in the future myself. Here we go:
>>>>>>>
>>>>>>> + When doing a GroupByKey, when does the shuffling actually take
>>>>>>> place? Could it be the behaviour is not the same when using a CombineFn 
>>>>>>> to
>>>>>>> aggregate compared to when using a Serializablefunction? (I have a 
>>>>>>> feeling
>>>>>>> in the first case not all the keys get shuffled to one machine, while it
>>>>>>> does for the second).
>>>>>>>
>>>>>>> + When using Accumulators in a CombineFn, what are the actual
>>>>>>> internals? Is there any docs on this? The problem I run into is that, 
>>>>>>> when
>>>>>>> I try adding elements to an ArrayList and then merge ArrayList, the 
>>>>>>> output
>>>>>>> is an empty list. The problem could probably be solved by using a
>>>>>>> Serializablefunction to Combine everything at once, but you might loose 
>>>>>>> the
>>>>>>> advantages of parallellisation in that case (~ above).
>>>>>>>
>>>>>>> Thanks a lot :)
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Matthias
>>>>>>> --
>>>>>>>
>>>>>>> *Matthias Baetens*
>>>>>>>
>>>>>>>
>>>>>>> *datatonic | data power unleashed*
>>>>>>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74
>>>>>>> 918 20646
>>>>>>>
>>>>>>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Matthias Baetens*
>>>>>
>>>>>
>>>>> *datatonic | data power unleashed*
>>>>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74
>>>>> 918 20646
>>>>>
>>>>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> *Matthias Baetens*
>>>
>>>
>>> *datatonic | data power unleashed*
>>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
>>> 20646
>>>
>>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>>
>>
>>


-- 

*Matthias Baetens*


*datatonic | data power unleashed*
office +44 203 668 3680  |  mobile +44 74 918 20646

Level24 | 1 Canada Square | Canary Wharf | E14 5AB London

Reply via email to