Fix side input handling in DoFnFunction

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7653e7ed
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7653e7ed
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7653e7ed

Branch: refs/heads/gearpump-runner
Commit: 7653e7ed6de3d9db822dcd390d2bf70819954fa5
Parents: 98854d4
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Wed Jun 7 14:08:04 2017 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Mon Jun 12 11:45:37 2017 +0800

----------------------------------------------------------------------
 .../translators/TranslationContext.java         |  2 ++
 .../translators/functions/DoFnFunction.java     | 23 ++++----------------
 .../gearpump/translators/io/GearpumpSource.java |  1 -
 .../translators/utils/TranslatorUtils.java      |  5 ++---
 4 files changed, 8 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index 4090354..64a1e0d 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -68,6 +68,8 @@ public class TranslationContext {
   public <OutputT> void setOutputStream(PValue output, JavaStream<OutputT> 
outputStream) {
     if (!streams.containsKey(output)) {
       streams.put(output, outputStream);
+    } else {
+      throw new RuntimeException("set stream for duplicated output " + output);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index f521d7b..6e4fbeb 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -66,7 +65,6 @@ public class DoFnFunction<InputT, OutputT> extends
   private transient PushbackSideInputDoFnRunner<InputT, OutputT> doFnRunner;
   private transient SideInputHandler sideInputReader;
   private transient List<WindowedValue<InputT>> pushedBackValues;
-  private transient Map<PCollectionView<?>, List<WindowedValue<Iterable<?>>>> 
sideInputValues;
   private final Collection<PCollectionView<?>> sideInputs;
   private final Map<String, PCollectionView<?>> tagsToSideInputs;
   private final TupleTag<OutputT> mainOutput;
@@ -109,7 +107,6 @@ public class DoFnFunction<InputT, OutputT> extends
     doFnRunner = doFnRunnerFactory.createRunner(sideInputReader);
 
     pushedBackValues = new LinkedList<>();
-    sideInputValues = new HashMap<>();
     outputManager.setup(mainOutput, sideOutputs);
   }
 
@@ -132,25 +129,14 @@ public class DoFnFunction<InputT, OutputT> extends
       } else {
         // side input
         PCollectionView<?> sideInput = 
tagsToSideInputs.get(unionValue.getUnionTag());
-        WindowedValue<?> sideInputValue =
-            (WindowedValue<?>) unionValue.getValue();
-        Object value = sideInputValue.getValue();
-        if (!(value instanceof Iterable)) {
-          sideInputValue = sideInputValue.withValue(Lists.newArrayList(value));
-        }
-        if (!sideInputValues.containsKey(sideInput)) {
-          sideInputValues.put(sideInput, new 
LinkedList<WindowedValue<Iterable<?>>>());
-        }
-        sideInputValues.get(sideInput).add((WindowedValue<Iterable<?>>) 
sideInputValue);
+        WindowedValue<Iterable<?>> sideInputValue =
+            (WindowedValue<Iterable<?>>) unionValue.getValue();
+        sideInputReader.addSideInputValue(sideInput, sideInputValue);
       }
     }
 
+
     for (PCollectionView<?> sideInput: sideInputs) {
-      if (sideInputValues.containsKey(sideInput)) {
-        for (WindowedValue<Iterable<?>> value: sideInputValues.get(sideInput)) 
{
-          sideInputReader.addSideInputValue(sideInput, value);
-        }
-      }
       for (WindowedValue<InputT> value : pushedBackValues) {
         for (BoundedWindow win: value.getWindows()) {
           BoundedWindow sideInputWindow =
@@ -171,7 +157,6 @@ public class DoFnFunction<InputT, OutputT> extends
     }
     pushedBackValues.clear();
     Iterables.addAll(pushedBackValues, nextPushedBackValues);
-    sideInputValues.clear();
 
     doFnRunner.finishBundle();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 5e79151..60f319d 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -28,7 +28,6 @@ import 
org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-// import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7653e7ed/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 999afae..282f261 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -70,11 +70,10 @@ public class TranslatorUtils {
       JavaStream<WindowedValue<InputT>> inputStream,
       Map<String, PCollectionView<?>> tagsToSideInputs) {
     JavaStream<RawUnionValue> mainStream =
-        inputStream.map(new ToRawUnionValue<InputT>("0"), 
"map_to_RawUnionValue");
+        inputStream.map(new ToRawUnionValue<>("0"), "map_to_RawUnionValue");
 
     for (Map.Entry<String, PCollectionView<?>> tagToSideInput: 
tagsToSideInputs.entrySet()) {
-      // actually JavaStream<WindowedValue<List<?>>>
-      JavaStream<WindowedValue<Object>> sideInputStream = 
context.getInputStream(
+      JavaStream<WindowedValue<List<?>>> sideInputStream = 
context.getInputStream(
           tagToSideInput.getValue());
       mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(
           tagToSideInput.getKey()), "map_to_RawUnionValue"), 
"merge_to_MainStream");

Reply via email to