[ https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777077#comment-16777077 ]
Kenneth Knowles commented on BEAM-6740: --------------------------------------- This is part of the SparkRunner, yes? Doesn't the SparkRunner still translate based on the Java class? There wasn't really a change - some runners started to translate based on URN, but they only added URNs for primitives and special composites that they cared about. I think other runners maybe just don't care about Combine.globally as a special composite. It seems just fine to add a URN now. Or am I misunderstanding the issue? Can you point it out in the SparkRunner? > Combine.globally translation is never called > -------------------------------------------- > > Key: BEAM-6740 > URL: https://issues.apache.org/jira/browse/BEAM-6740 > Project: Beam > Issue Type: Bug > Components: runner-core > Reporter: Etienne Chauchot > Priority: Major > > SDK translates Combine.Globally as a composite transform composed of: > * Map that assigns Void keys > * Combine.PerKey > on spark: As Combine.Perkey uses a spark GBK inside it, the runner adds its > own translation of Combine.Globally to avoid less performant GBK. This > translation should be called in place of entering the composite transform > translation.A pipeline like this: > {code} > PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, > 9, 10)); > input.apply( > Combine.globally(new IntegerCombineFn())); > {code} > {code} > private static class IntegerCombineFn extends Combine.CombineFn<Integer, > Integer, Integer> { > @Override > public Integer createAccumulator() { > return 0; > } > @Override > public Integer addInput(Integer accumulator, Integer input) { > return accumulator + input; > } > @Override > public Integer mergeAccumulators(Iterable<Integer> accumulators) { > Integer result = 0; > for (Integer value : accumulators) { > result += value; > } > return result; > } > @Override > public Integer extractOutput(Integer accumulator) { > return accumulator; > } > } > {code} > is translated as the above composite. -- This message was sent by Atlassian JIRA (v7.6.3#76005)