[ https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía reassigned BEAM-6740: ---------------------------------- Assignee: Ismaël Mejía > Combine.globally translation is never called > -------------------------------------------- > > Key: BEAM-6740 > URL: https://issues.apache.org/jira/browse/BEAM-6740 > Project: Beam > Issue Type: Bug > Components: runner-core > Reporter: Etienne Chauchot > Assignee: Ismaël Mejía > Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Beam translates Combine.Globally as a composite transform composed of: > * Map that assigns Void keys > * Combine.PerKey > on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its > own translation of Combine.Globally to avoid less performant GBK. This > translation should be called in place of entering the composite transform > translation.A pipeline like this: > {code:java} > PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, > 9, 10)); > input.apply( > Combine.globally(new IntegerCombineFn())); > {code} > {code:java} > private static class IntegerCombineFn extends Combine.CombineFn<Integer, > Integer, Integer> { > @Override > public Integer createAccumulator() { > return 0; > } > @Override > public Integer addInput(Integer accumulator, Integer input) { > return accumulator + input; > } > @Override > public Integer mergeAccumulators(Iterable<Integer> accumulators) { > Integer result = 0; > for (Integer value : accumulators) { > result += value; > } > return result; > } > @Override > public Integer extractOutput(Integer accumulator) { > return accumulator; > } > } > {code} > is translated as the above composite. -- This message was sent by Atlassian JIRA (v7.6.3#76005)