Hi Aljoscha,

Thank you very much for the clear answer, it solves my problem (since I now
know I should use a SerializableFunction when wanting to have a full view
of the data for one key).

For the second question, the accumulator that I wrote appends elements to
the ArrayList of a custom object in case it's not in the list yet:

@DefaultCoder(AvroCoder.class)
public static class Accum {
Log log = null;
}
@Override
public Accum createAccumulator() { return new Accum(); }
@Override
public Accum addInput(Accum accum, Log input) {
if(accum.log == null) {
accum.log = new Log(input);
}
String[] hits = input.getHits_appInfo_screenName().split("/");
for(int i = 0; i < full_list.length; i++) {
if(Arrays.asList(hits).contains(full_list[i]) &&
!accum.log.getSparseArray().contains(i)) accum.log.getSparseArray().add(i);
}
return accum;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for(Accum accum : accums) {
if(merged.log == null){
merged.log = new Log(accum.log);
}
else {
for(Integer index : accum.log.getSparseArray()) {
if(!merged.log.getSparseArray().contains(index))
merged.log.getSparseArray().add(index);
}
}
}
return merged;
}
@Override
public Log extractOutput(Accum accum) {
return accum.log;
}

Hope this helps for answering the second question!

Thanks again :)

Best,

Matthias

On Tue, Nov 22, 2016 at 11:47 AM, Aljoscha Krettek <[email protected]>
wrote:

> Hi Matthias,
> when the shuffle happens is not defined by the Beam model so it depends on
> the runner. You are right, though, that a runner can optimise execution
> when you specify a CombineFn. In that case a runner can choose to combine
> elements before shuffling to reduce the amount of data that we have to
> shuffle across the network. With a SerializableFunction that's not possible
> because we don't have an intermediate accumulation type as we have with a
> CombineFn. Therefore, the runner has to ship all elements for a given key
> to one machine to apply the SerializableFunction.
>
> Regarding your second question, could you maybe send a code snipped? That
> would allow us to have a look and give a good answer.
>
> Cheers,
> Aljoscha
>
> On Tue, 22 Nov 2016 at 12:32 Matthias Baetens <matthias.baetens@datatonic.
> com> wrote:
>
>> Hi there,
>>
>> I had some questions about the internal working of these two concepts and
>> where I could find more info on this (so I might be able similar problems
>> in the future myself. Here we go:
>>
>> + When doing a GroupByKey, when does the shuffling actually take place?
>> Could it be the behaviour is not the same when using a CombineFn to
>> aggregate compared to when using a Serializablefunction? (I have a feeling
>> in the first case not all the keys get shuffled to one machine, while it
>> does for the second).
>>
>> + When using Accumulators in a CombineFn, what are the actual internals?
>> Is there any docs on this? The problem I run into is that, when I try
>> adding elements to an ArrayList and then merge ArrayList, the output is an
>> empty list. The problem could probably be solved by using a
>> Serializablefunction to Combine everything at once, but you might loose the
>> advantages of parallellisation in that case (~ above).
>>
>> Thanks a lot :)
>>
>> Best,
>>
>> Matthias
>> --
>>
>> *Matthias Baetens*
>>
>>
>> *datatonic | data power unleashed*
>> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
>> 20646
>>
>> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
>>
>


-- 

*Matthias Baetens*


*datatonic | data power unleashed*
office +44 203 668 3680  |  mobile +44 74 918 20646

Level24 | 1 Canada Square | Canary Wharf | E14 5AB London

Reply via email to