[ https://issues.apache.org/jira/browse/BEAM-5673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-5673: ---------------------------------- Summary: View.asMap on non-KV PCollection fails at runtime, not construction/submission time (was: Direct java runner crashes when using both timers and side input) > View.asMap on non-KV PCollection fails at runtime, not > construction/submission time > ----------------------------------------------------------------------------------- > > Key: BEAM-5673 > URL: https://issues.apache.org/jira/browse/BEAM-5673 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.6.0, 2.7.0 > Reporter: Przemyslaw Pastuszka > Assignee: Kenneth Knowles > Priority: Major > > I'm trying to write a ParDo, which will use both Timer and Side Input, but it > crashes when I try to run it with {{beam-runners-direct-java}} with > {{IllegalArgumentException}} on a line > [https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java#L167], > because there are actually two inputs to ParDo (main PCollection and side > input), while only one is expected. It looks like a bug in an implementation. > > Here's the code that reproduces the issue: > {code:java} > public class TestCrashesForTimerAndSideInput { > @Rule > public final transient TestPipeline p = TestPipeline.create(); > private static class DoFnWithTimer extends DoFn<KV<String, String>, > String> { > @TimerId("t") > private final TimerSpec tSpec = > TimerSpecs.timer(TimeDomain.PROCESSING_TIME); > private final PCollectionView<Map<String, String>> sideInput; > private DoFnWithTimer(PCollectionView<Map<String, String>> sideInput) > { > this.sideInput = sideInput; > } > @ProcessElement > public void processElement(ProcessContext c, @TimerId("t") Timer t) { > KV<String, String> element = c.element(); > c.output(element.getKey() + c.sideInput(sideInput).get(element)); > t.offset(Duration.standardSeconds(1)).setRelative(); > } > @OnTimer("t") > public void onTimerFire(OnTimerContext x) { > x.output("Timer fired"); > } > } > @Test > public void testCrashesForTimerAndSideInput() { > ImmutableMap<String, String> sideData = ImmutableMap.<String, > String>builder(). > put("x", "X"). > put("y", "Y"). > build(); > PCollectionView<Map<String, String>> sideInput = > p.apply(Create.of(sideData)).apply(View.asMap()); > TestStream<String> testStream = > TestStream.create(StringUtf8Coder.of()). > addElements("x"). > advanceProcessingTime(Duration.standardSeconds(1)). > addElements("y"). > advanceProcessingTime(Duration.standardSeconds(1)). > advanceWatermarkToInfinity(); > PCollection<String> result = p. > apply(testStream). > apply(MapElements.into(kvs(strings(), strings())).via(v -> > KV.of(v, v))). > apply(ParDo.of(new > DoFnWithTimer(sideInput)).withSideInputs(sideInput)); > PAssert.that(result).containsInAnyOrder("xX", "yY", "Timer fired"); > p.run(); > } > } > {code} > > and the error is: > {code} > java.lang.IllegalArgumentException: expected one element but was: > <ParDo(DoFnWithTimer)/ParMultiDo(DoFnWithTimer)/To > KeyedWorkItem/ParMultiDo(ToKeyedWorkItem).output [PCollection], > View.AsMap/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)/ParMultiDo(VoidKeyToMultimapMaterialization).output > [PCollection]> > at > org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:322) > at > org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294) > at > org.apache.beam.runners.direct.QuiescenceDriver.fireTimers(QuiescenceDriver.java:167) > at > org.apache.beam.runners.direct.QuiescenceDriver.drive(QuiescenceDriver.java:110) > at > org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$2.run(ExecutorServiceParallelExecutor.java:170) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)