[ https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-1149: ---------------------------------- Component/s: runner-core > Side input access fails in direct runner (possibly others too) when input > element in multiple windows > ----------------------------------------------------------------------------------------------------- > > Key: BEAM-1149 > URL: https://issues.apache.org/jira/browse/BEAM-1149 > Project: Beam > Issue Type: Bug > Components: runner-core > Reporter: Eugene Kirpichov > Assignee: Kenneth Knowles > Priority: Blocker > Fix For: 0.4.0-incubating > > > {code:java} > private static class FnWithSideInputs extends DoFn<String, String> { > private final PCollectionView<Integer> view; > private FnWithSideInputs(PCollectionView<Integer> view) { > this.view = view; > } > @ProcessElement > public void processElement(ProcessContext c) { > c.output(c.element() + ":" + c.sideInput(view)); > } > } > @Test > public void testSideInputsWithMultipleWindows() { > Pipeline p = TestPipeline.create(); > MutableDateTime mutableNow = Instant.now().toMutableDateTime(); > mutableNow.setMillisOfSecond(0); > Instant now = mutableNow.toInstant(); > SlidingWindows windowFn = > > SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); > PCollectionView<Integer> view = > p.apply(Create.of(1)).apply(View.<Integer>asSingleton()); > PCollection<String> res = > p.apply(Create.timestamped(TimestampedValue.of("a", now))) > .apply(Window.<String>into(windowFn)) > .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); > PAssert.that(res).containsInAnyOrder("a:1"); > p.run(); > } > {code} > This fails with the following exception: > {code} > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: sideInput called when main input element is > in multiple windows > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112) > at .... > Caused by: java.lang.IllegalStateException: sideInput called when main input > element is in multiple windows > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514) > at > org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)