[ https://issues.apache.org/jira/browse/BEAM-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-5098: ---------------------------------- Component/s: (was: beam-model) sdk-java-core > Combine.Globally::asSingletonView clears side inputs > ---------------------------------------------------- > > Key: BEAM-5098 > URL: https://issues.apache.org/jira/browse/BEAM-5098 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.5.0 > Reporter: Mike Pedersen > Assignee: Kenneth Knowles > Priority: Critical > Labels: beginner, starter > > It seems like calling .asSingletonView on Combine.Globally clears all side > inputs. Take this code for example: > > {code:java} > public class Main { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > PCollection<Integer> a = p.apply(Create.of(1, 2, 3)); > PCollectionView<Integer> b = > p.apply(Create.of(10)).apply(View.asSingleton()); > a > .apply(Combine.globally(new > CombineWithContext.CombineFnWithContext<Integer, Integer, Integer>() { > @Override > public Integer > createAccumulator(CombineWithContext.Context c) { > return c.sideInput(b); > } > @Override > public Integer addInput(Integer accumulator, Integer > input, CombineWithContext.Context c) { > return accumulator + input; > } > @Override > public Integer mergeAccumulators(Iterable<Integer> > accumulators, CombineWithContext.Context c) { > int sum = 0; > for (int i : accumulators) { > sum += i; > } > return sum; > } > @Override > public Integer extractOutput(Integer accumulator, > CombineWithContext.Context c) { > return accumulator; > } > @Override > public Integer defaultValue() { > return 0; > } > }).withSideInputs(b).asSingletonView()); > p.run().waitUntilFinish(); > } > }{code} > This fails with the following exception: > {code:java} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalArgumentException: calling sideInput() with unknown view > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > at Main.main(Main.java:287) > Caused by: java.lang.IllegalArgumentException: calling sideInput() with > unknown view > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212) > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69) > at > org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137) > at Main$1.createAccumulator(Main.java:258) > at Main$1.createAccumulator(Main.java:255) > at > org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120) > at > org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129){code} > But if you change > {code:java} > .withSideInputs(b).asSingletonView()){code} > to > {code:java} > .withSideInputs(b)).apply(View.asSingleton()){code} > then it works just fine. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)