Repository: beam Updated Branches: refs/heads/master d7ed2e23a -> 88e842534
Obtain Flink Views from the Context Output Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/810a5b4b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/810a5b4b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/810a5b4b Branch: refs/heads/master Commit: 810a5b4b0ffdc0b3df99bc24177b07bfe3ee8893 Parents: d7ed2e2 Author: Thomas Groh <tg...@google.com> Authored: Fri Feb 17 08:31:00 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Wed Feb 22 09:09:01 2017 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/runners/flink/FlinkRunner.java | 4 ---- .../flink/translation/FlinkStreamingTransformTranslators.java | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/810a5b4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 5f92378..0f44ba9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -615,10 +615,6 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> { return new CreateFlinkPCollectionView<>(view); } - public PCollectionView<ViewT> getView() { - return view; - } - @Override public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) { return view; http://git-wip-us.apache.org/repos/asf/beam/blob/810a5b4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index cd0ef03..7f86488 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -696,9 +696,9 @@ public class FlinkStreamingTransformTranslators { DataStream<WindowedValue<List<ElemT>>> inputDataSet = context.getInputDataStream(context.getInput(transform)); - PCollectionView<ViewT> input = transform.getView(); + PCollectionView<ViewT> view = context.getOutput(transform); - context.setOutputDataStream(input, inputDataSet); + context.setOutputDataStream(view, inputDataSet); } }