fix ParDo.BoundMulti translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2d326ff Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2d326ff Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2d326ff Branch: refs/heads/gearpump-runner Commit: b2d326ff73afca5c8e941c8006e9d74261a6b9df Parents: 364a3f0 Author: manuzhang <owenzhang1...@gmail.com> Authored: Mon Jan 16 12:31:26 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Mon Jan 16 12:31:26 2017 +0800 ---------------------------------------------------------------------- .../gearpump/translators/ParDoBoundMultiTranslator.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b2d326ff/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java index 0d5b8bc..bf7073b 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java @@ -91,8 +91,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory; private DoFnRunner<InputT, OutputT> doFnRunner; private final DoFn<InputT, OutputT> doFn; - private final List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs = Lists - .newArrayList(); + private List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs; public DoFnMultiFunction( GearpumpPipelineOptions pipelineOptions, @@ -127,6 +126,8 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements @Override public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) { + outputs = Lists.newArrayList(); + if (null == doFnRunner) { doFnRunner = doFnRunnerFactory.createRunner(); } @@ -166,6 +167,7 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements @Override public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) { + // System.out.println(wv.getValue().getKey() + ":" + wv.getValue().getValue()); return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(), wv.getWindows(), wv.getPane()); }