[ 
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía reassigned BEAM-6740:
----------------------------------

    Assignee: Ismaël Mejía

> Combine.globally translation is never called
> --------------------------------------------
>
>                 Key: BEAM-6740
>                 URL: https://issues.apache.org/jira/browse/BEAM-6740
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Etienne Chauchot
>            Assignee: Ismaël Mejía
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Beam translates Combine.Globally as a composite transform composed of:
>  * Map that assigns Void keys
>  * Combine.PerKey
> on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its 
> own translation of Combine.Globally to avoid less performant GBK. This 
> translation should be called in place of entering the composite transform 
> translation.A pipeline like this: 
> {code:java}
> PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 
> 9, 10));
> input.apply(
>  Combine.globally(new IntegerCombineFn()));
> {code}
> {code:java}
>   private static class IntegerCombineFn extends Combine.CombineFn<Integer, 
> Integer, Integer> {
>     @Override
>     public Integer createAccumulator() {
>       return 0;
>     }
>     @Override
>     public Integer addInput(Integer accumulator, Integer input) {
>       return accumulator + input;
>     }
>     @Override
>     public Integer mergeAccumulators(Iterable<Integer> accumulators) {
>       Integer result = 0;
>       for (Integer value : accumulators) {
>         result += value;
>       }
>       return result;
>     }
>     @Override
>     public Integer extractOutput(Integer accumulator) {
>       return accumulator;
>     }
>   }
> {code}
> is translated as the above composite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to