[BEAM-1726] Fix empty side inputs in Flink Streaming Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5555040d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5555040d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5555040d Branch: refs/heads/master Commit: 5555040d935c67f5cd48f2ffe2721a07fe6e0a50 Parents: 95ade45 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Sat Mar 18 12:16:06 2017 +0100 Committer: Aviem Zur <aviem...@gmail.com> Committed: Thu May 4 20:48:56 2017 +0300 ---------------------------------------------------------------------- .../beam/runners/core/SideInputHandler.java | 10 ++++---- .../wrappers/streaming/DoFnOperator.java | 27 +++++++++++++++++++- 2 files changed, 31 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index 5c67148..b29f9d0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -161,11 +162,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { @Override public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) { - if (!isReady(sideInput, window)) { - throw new IllegalStateException( - "Side input " + sideInput + " is not ready for window " + window); - } - @SuppressWarnings("unchecked") Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) sideInput @@ -181,6 +177,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { Iterable<WindowedValue<?>> elements = state.read(); + if (elements == null) { + elements = Collections.emptyList(); + } + return sideInput.getViewFn().apply(elements); } http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index c624036..16bf5d2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -463,7 +463,32 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public void processWatermark2(Watermark mark) throws Exception { - // ignore watermarks from the side-input input + if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + pushbackDoFnRunner.startBundle(); + + BagState<WindowedValue<InputT>> pushedBack = + pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag); + + Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read(); + if (pushedBackContents != null) { + for (WindowedValue<InputT> elem : pushedBackContents) { + + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(elem)); + + doFnRunner.processElement(elem); + } + } + + setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + + pushbackDoFnRunner.finishBundle(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } } @Override