[ https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15746497#comment-15746497 ]
ASF GitHub Bot commented on BEAM-1149: -------------------------------------- GitHub user jkff opened a pull request: https://github.com/apache/incubator-beam/pull/1601 [BEAM-1149] Explode windows when fn uses side inputs This basically reverts https://github.com/apache/incubator-beam/commit/38f0b11cc9028cf347e3c96b6e6116e5a5a9972d and adds some comments and a test. R: @kennknowles You can merge this pull request into a Git repository by running: $ git pull https://github.com/jkff/incubator-beam multi-window-side-inputs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1601.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1601 ---- commit e2644e3a869c40bbb3614198dfed8a4b2ab35ab8 Author: Eugene Kirpichov <kirpic...@google.com> Date: 2016-12-13T22:35:33Z [BEAM-1149] Explode windows when fn uses side inputs ---- > Side input access fails in direct runner (possibly others too) when input > element in multiple windows > ----------------------------------------------------------------------------------------------------- > > Key: BEAM-1149 > URL: https://issues.apache.org/jira/browse/BEAM-1149 > Project: Beam > Issue Type: Bug > Reporter: Eugene Kirpichov > Assignee: Eugene Kirpichov > Priority: Blocker > > {code:java} > private static class FnWithSideInputs extends DoFn<String, String> { > private final PCollectionView<Integer> view; > private FnWithSideInputs(PCollectionView<Integer> view) { > this.view = view; > } > @ProcessElement > public void processElement(ProcessContext c) { > c.output(c.element() + ":" + c.sideInput(view)); > } > } > @Test > public void testSideInputsWithMultipleWindows() { > Pipeline p = TestPipeline.create(); > MutableDateTime mutableNow = Instant.now().toMutableDateTime(); > mutableNow.setMillisOfSecond(0); > Instant now = mutableNow.toInstant(); > SlidingWindows windowFn = > > SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); > PCollectionView<Integer> view = > p.apply(Create.of(1)).apply(View.<Integer>asSingleton()); > PCollection<String> res = > p.apply(Create.timestamped(TimestampedValue.of("a", now))) > .apply(Window.<String>into(windowFn)) > .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); > PAssert.that(res).containsInAnyOrder("a:1"); > p.run(); > } > {code} > This fails with the following exception: > {code} > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.IllegalStateException: sideInput called when main input element is > in multiple windows > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) > at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112) > at .... > Caused by: java.lang.IllegalStateException: sideInput called when main input > element is in multiple windows > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514) > at > org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)