Fix side input handling in DoFnFunction
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7653e7ed Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7653e7ed Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7653e7ed Branch: refs/heads/gearpump-runner Commit: 7653e7ed6de3d9db822dcd390d2bf70819954fa5 Parents: 98854d4 Author: manuzhang <owenzhang1...@gmail.com> Authored: Wed Jun 7 14:08:04 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Mon Jun 12 11:45:37 2017 +0800 ---------------------------------------------------------------------- .../translators/TranslationContext.java | 2 ++ .../translators/functions/DoFnFunction.java | 23 ++++---------------- .../gearpump/translators/io/GearpumpSource.java | 1 - .../translators/utils/TranslatorUtils.java | 5 ++--- 4 files changed, 8 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java index 4090354..64a1e0d 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java @@ -68,6 +68,8 @@ public class TranslationContext { public <OutputT> void setOutputStream(PValue output, JavaStream<OutputT> outputStream) { if (!streams.containsKey(output)) { streams.put(output, outputStream); + } else { + throw new RuntimeException("set stream for duplicated output " + output); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java index f521d7b..6e4fbeb 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import java.io.Serializable; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -66,7 +65,6 @@ public class DoFnFunction<InputT, OutputT> extends private transient PushbackSideInputDoFnRunner<InputT, OutputT> doFnRunner; private transient SideInputHandler sideInputReader; private transient List<WindowedValue<InputT>> pushedBackValues; - private transient Map<PCollectionView<?>, List<WindowedValue<Iterable<?>>>> sideInputValues; private final Collection<PCollectionView<?>> sideInputs; private final Map<String, PCollectionView<?>> tagsToSideInputs; private final TupleTag<OutputT> mainOutput; @@ -109,7 +107,6 @@ public class DoFnFunction<InputT, OutputT> extends doFnRunner = doFnRunnerFactory.createRunner(sideInputReader); pushedBackValues = new LinkedList<>(); - sideInputValues = new HashMap<>(); outputManager.setup(mainOutput, sideOutputs); } @@ -132,25 +129,14 @@ public class DoFnFunction<InputT, OutputT> extends } else { // side input PCollectionView<?> sideInput = tagsToSideInputs.get(unionValue.getUnionTag()); - WindowedValue<?> sideInputValue = - (WindowedValue<?>) unionValue.getValue(); - Object value = sideInputValue.getValue(); - if (!(value instanceof Iterable)) { - sideInputValue = sideInputValue.withValue(Lists.newArrayList(value)); - } - if (!sideInputValues.containsKey(sideInput)) { - sideInputValues.put(sideInput, new LinkedList<WindowedValue<Iterable<?>>>()); - } - sideInputValues.get(sideInput).add((WindowedValue<Iterable<?>>) sideInputValue); + WindowedValue<Iterable<?>> sideInputValue = + (WindowedValue<Iterable<?>>) unionValue.getValue(); + sideInputReader.addSideInputValue(sideInput, sideInputValue); } } + for (PCollectionView<?> sideInput: sideInputs) { - if (sideInputValues.containsKey(sideInput)) { - for (WindowedValue<Iterable<?>> value: sideInputValues.get(sideInput)) { - sideInputReader.addSideInputValue(sideInput, value); - } - } for (WindowedValue<InputT> value : pushedBackValues) { for (BoundedWindow win: value.getWindows()) { BoundedWindow sideInputWindow = @@ -171,7 +157,6 @@ public class DoFnFunction<InputT, OutputT> extends } pushedBackValues.clear(); Iterables.addAll(pushedBackValues, nextPushedBackValues); - sideInputValues.clear(); doFnRunner.finishBundle(); http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java index 5e79151..60f319d 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; -// import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java index 999afae..282f261 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -70,11 +70,10 @@ public class TranslatorUtils { JavaStream<WindowedValue<InputT>> inputStream, Map<String, PCollectionView<?>> tagsToSideInputs) { JavaStream<RawUnionValue> mainStream = - inputStream.map(new ToRawUnionValue<InputT>("0"), "map_to_RawUnionValue"); + inputStream.map(new ToRawUnionValue<>("0"), "map_to_RawUnionValue"); for (Map.Entry<String, PCollectionView<?>> tagToSideInput: tagsToSideInputs.entrySet()) { - // actually JavaStream<WindowedValue<List<?>>> - JavaStream<WindowedValue<Object>> sideInputStream = context.getInputStream( + JavaStream<WindowedValue<List<?>>> sideInputStream = context.getInputStream( tagToSideInput.getValue()); mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>( tagToSideInput.getKey()), "map_to_RawUnionValue"), "merge_to_MainStream");