Repository: beam
Updated Branches:
  refs/heads/master 00b395881 -> fbcde4cdc


[BEAM-646] Get Flink Runner out of apply()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ac985c9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ac985c9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ac985c9

Branch: refs/heads/master
Commit: 0ac985c92a1af1dcc6e55f199bf02d57a484b9e3
Parents: 00b3958
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Sat Feb 18 16:12:19 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Feb 23 11:10:29 2017 +0100

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    | 11 +++
 .../FlinkPipelineExecutionEnvironment.java      |  4 +-
 .../apache/beam/runners/flink/FlinkRunner.java  | 32 +-------
 .../flink/FlinkStreamingPipelineTranslator.java | 82 +++++++++++++++++++-
 4 files changed, 95 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ac985c9/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index a7fae5d..d821ca0 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -175,6 +175,17 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <!-- Test scoped -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/0ac985c9/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 6e4d27a..ba00036 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -79,7 +79,7 @@ class FlinkPipelineExecutionEnvironment {
    * a {@link org.apache.flink.api.java.DataSet}
    * or {@link org.apache.flink.streaming.api.datastream.DataStream} one.
    * */
-  public void translate(Pipeline pipeline) {
+  public void translate(FlinkRunner flinkRunner, Pipeline pipeline) {
     this.flinkBatchEnv = null;
     this.flinkStreamEnv = null;
 
@@ -92,7 +92,7 @@ class FlinkPipelineExecutionEnvironment {
     FlinkPipelineTranslator translator;
     if (translationMode == TranslationMode.STREAMING) {
       this.flinkStreamEnv = createStreamExecutionEnvironment();
-      translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, 
options);
+      translator = new FlinkStreamingPipelineTranslator(flinkRunner, 
flinkStreamEnv, options);
     } else {
       this.flinkBatchEnv = createBatchExecutionEnvironment();
       translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);

http://git-wip-us.apache.org/repos/asf/beam/blob/0ac985c9/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index b3c7c65..5610dd4 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -39,9 +39,6 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.DetachedEnvironment;
@@ -129,7 +126,7 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
     FlinkPipelineExecutionEnvironment env = new 
FlinkPipelineExecutionEnvironment(options);
 
     LOG.info("Translating pipeline to Flink program.");
-    env.translate(pipeline);
+    env.translate(this, pipeline);
 
     JobExecutionResult result;
     try {
@@ -166,33 +163,6 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
   }
 
   @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (overrides.containsKey(transform.getClass())) {
-      // It is the responsibility of whoever constructs overrides to ensure 
this is type safe.
-      @SuppressWarnings("unchecked")
-      Class<PTransform<InputT, OutputT>> transformClass =
-          (Class<PTransform<InputT, OutputT>>) transform.getClass();
-
-      @SuppressWarnings("unchecked")
-      Class<PTransform<InputT, OutputT>> customTransformClass =
-          (Class<PTransform<InputT, OutputT>>) 
overrides.get(transform.getClass());
-
-      PTransform<InputT, OutputT> customTransform =
-          InstanceBuilder.ofType(customTransformClass)
-              .withArg(FlinkRunner.class, this)
-              .withArg(transformClass, transform)
-              .build();
-
-      return Pipeline.applyTransform(input, customTransform);
-    } else {
-      return super.apply(transform, input);
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  @Override
   public String toString() {
     return "FlinkRunner#" + hashCode();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ac985c9/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 3cbdeb2..9ab1310 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -17,9 +17,19 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
@@ -40,8 +50,54 @@ class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
 
   private int depth = 0;
 
-  public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, 
PipelineOptions options) {
+  private FlinkRunner flinkRunner;
+
+  public FlinkStreamingPipelineTranslator(
+      FlinkRunner flinkRunner,
+      StreamExecutionEnvironment env,
+      PipelineOptions options) {
     this.streamingContext = new FlinkStreamingTranslationContext(env, options);
+    this.flinkRunner = flinkRunner;
+  }
+
+  @Override
+  public void translate(Pipeline pipeline) {
+    Map<PTransformMatcher, PTransformOverrideFactory> transformOverrides =
+        ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
+            .put(
+                PTransformMatchers.classEqualTo(View.AsIterable.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    FlinkStreamingViewOverrides.StreamingViewAsIterable.class, 
flinkRunner))
+            .put(
+                PTransformMatchers.classEqualTo(View.AsList.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    FlinkStreamingViewOverrides.StreamingViewAsList.class, 
flinkRunner))
+            .put(
+                PTransformMatchers.classEqualTo(View.AsMap.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    FlinkStreamingViewOverrides.StreamingViewAsMap.class, 
flinkRunner))
+            .put(
+                PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, 
flinkRunner))
+            .put(
+                PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    
FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner))
+            // this has to be last since the ViewAsSingleton override
+            // can expand to a Combine.GloballyAsSingletonView
+            .put(
+                
PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    
FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class,
+                    flinkRunner))
+        .build();
+
+    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
+        transformOverrides.entrySet()) {
+      pipeline.replace(override.getKey(), override.getValue());
+    }
+    super.translate(pipeline);
   }
 
   // 
--------------------------------------------------------------------------------------------
@@ -147,4 +203,28 @@ class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
       return true;
     }
   }
+
+  private static class ReflectiveOneToOneOverrideFactory<
+      InputT extends PValue,
+      OutputT extends PValue,
+      TransformT extends PTransform<InputT, OutputT>>
+      extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+    private final Class<PTransform<InputT, OutputT>> replacement;
+    private final FlinkRunner runner;
+
+    private ReflectiveOneToOneOverrideFactory(
+        Class<PTransform<InputT, OutputT>> replacement, FlinkRunner runner) {
+      this.replacement = replacement;
+      this.runner = runner;
+    }
+
+    @Override
+    public PTransform<InputT, OutputT> getReplacementTransform(TransformT 
transform) {
+      return InstanceBuilder.ofType(replacement)
+          .withArg(FlinkRunner.class, runner)
+          .withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), 
transform)
+          .build();
+    }
+  }
+
 }

Reply via email to