fix ParDo.BoundMulti translation

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2d326ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2d326ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2d326ff

Branch: refs/heads/gearpump-runner
Commit: b2d326ff73afca5c8e941c8006e9d74261a6b9df
Parents: 364a3f0
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Mon Jan 16 12:31:26 2017 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Mon Jan 16 12:31:26 2017 +0800

----------------------------------------------------------------------
 .../gearpump/translators/ParDoBoundMultiTranslator.java        | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b2d326ff/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 0d5b8bc..bf7073b 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -91,8 +91,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> 
implements
     private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
     private DoFnRunner<InputT, OutputT> doFnRunner;
     private final DoFn<InputT, OutputT> doFn;
-    private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs 
= Lists
-        .newArrayList();
+    private List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs;
 
     public DoFnMultiFunction(
         GearpumpPipelineOptions pipelineOptions,
@@ -127,6 +126,8 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> 
implements
 
     @Override
     public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> 
apply(WindowedValue<InputT> wv) {
+      outputs = Lists.newArrayList();
+
       if (null == doFnRunner) {
         doFnRunner = doFnRunnerFactory.createRunner();
       }
@@ -166,6 +167,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> 
implements
 
     @Override
     public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, 
OutputT>> wv) {
+      // System.out.println(wv.getValue().getKey() + ":" + 
wv.getValue().getValue());
       return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
           wv.getWindows(), wv.getPane());
     }

Reply via email to