Get ApexRunner out of the #apply()

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/47333b0e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/47333b0e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/47333b0e

Branch: refs/heads/master
Commit: 47333b0e7882888ffa41298a142e0ca7de2672a3
Parents: eeed30b
Author: Thomas Groh <tg...@google.com>
Authored: Mon Feb 27 09:38:10 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Feb 27 15:16:48 2017 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  5 +
 .../apache/beam/runners/apex/ApexRunner.java    | 99 ++++++++++++--------
 .../translation/ApexPipelineTranslator.java     |  5 +-
 .../translation/CreateValuesTranslator.java     | 11 ++-
 .../core/construction/PrimitiveCreate.java      | 77 +++++++++++++++
 5 files changed, 151 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/47333b0e/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index f45e2df..f5fe4bc 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -87,6 +87,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-core-java</artifactId>
       <exclusions>
         <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/47333b0e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 1eb5e72..010ede3 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -22,7 +22,7 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
-
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,6 +31,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.apex.api.EmbeddedAppLauncher;
@@ -38,25 +39,30 @@ import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;
 import org.apache.apex.api.Launcher.LaunchMode;
 import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PrimitiveCreate;
+import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.AsIterable;
+import org.apache.beam.sdk.transforms.View.AsSingleton;
 import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -89,37 +95,27 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
     return new ApexRunner(apexPipelineOptions);
   }
 
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-
-    if (Create.Values.class.equals(transform.getClass())) {
-      return (OutputT) PCollection
-          .<OutputT>createPrimitiveOutputInternal(
-              input.getPipeline(),
-              WindowingStrategy.globalDefault(),
-              PCollection.IsBounded.BOUNDED);
-    } else if 
(Combine.GloballyAsSingletonView.class.equals(transform.getClass())) {
-      PTransform<InputT, OutputT> customTransform = (PTransform)
-          new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(
-              this, (Combine.GloballyAsSingletonView) transform);
-      return Pipeline.applyTransform(input, customTransform);
-    } else if (View.AsSingleton.class.equals(transform.getClass())) {
-      // assumes presence of above Combine.GloballyAsSingletonView mapping
-      PTransform<InputT, OutputT> customTransform = (PTransform)
-          new StreamingViewAsSingleton<InputT>(this, (View.AsSingleton) 
transform);
-      return Pipeline.applyTransform(input, customTransform);
-    } else if (View.AsIterable.class.equals(transform.getClass())) {
-      PTransform<InputT, OutputT> customTransform = (PTransform)
-          new StreamingViewAsIterable<InputT>(this, (View.AsIterable) 
transform);
-      return Pipeline.applyTransform(input, customTransform);
-    } else {
-      return super.apply(transform, input);
-    }
+  private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides() {
+    return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
+        .put(PTransformMatchers.classEqualTo(Create.Values.class), new 
PrimitiveCreate.Factory())
+        .put(
+            PTransformMatchers.classEqualTo(View.AsSingleton.class),
+            new StreamingViewAsSingleton.Factory())
+        .put(
+            PTransformMatchers.classEqualTo(View.AsIterable.class),
+            new StreamingViewAsIterable.Factory())
+        .put(
+            
PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+            new StreamingCombineGloballyAsSingletonView.Factory())
+        .build();
   }
 
   @Override
   public ApexRunnerResult run(final Pipeline pipeline) {
+    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
+        getOverrides().entrySet()) {
+      pipeline.replace(override.getKey(), override.getValue());
+    }
 
     final ApexPipelineTranslator translator = new 
ApexPipelineTranslator(options);
     final AtomicReference<DAG> apexDAG = new AtomicReference<>();
@@ -234,7 +230,7 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
     /**
      * Builds an instance of this class from the overridden transform.
      */
-    public StreamingCombineGloballyAsSingletonView(ApexRunner runner,
+    private StreamingCombineGloballyAsSingletonView(
         Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
       this.transform = transform;
     }
@@ -257,6 +253,17 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
     protected String getKindString() {
       return "StreamingCombineGloballyAsSingletonView";
     }
+
+    static class Factory<InputT, OutputT>
+        extends SingleInputOutputOverrideFactory<
+            PCollection<InputT>, PCollectionView<OutputT>,
+            Combine.GloballyAsSingletonView<InputT, OutputT>> {
+      @Override
+      public PTransform<PCollection<InputT>, PCollectionView<OutputT>> 
getReplacementTransform(
+          GloballyAsSingletonView<InputT, OutputT> transform) {
+        return new StreamingCombineGloballyAsSingletonView<>(transform);
+      }
+    }
   }
 
   private static class StreamingViewAsSingleton<T>
@@ -265,7 +272,7 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
 
     private View.AsSingleton<T> transform;
 
-    public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> 
transform) {
+    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
       this.transform = transform;
     }
 
@@ -310,17 +317,23 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
         }
       }
     }
+
+    static class Factory<T>
+        extends SingleInputOutputOverrideFactory<
+            PCollection<T>, PCollectionView<T>, View.AsSingleton<T>> {
+      @Override
+      public PTransform<PCollection<T>, PCollectionView<T>> 
getReplacementTransform(
+          AsSingleton<T> transform) {
+        return new StreamingViewAsSingleton<>(transform);
+      }
+    }
   }
 
   private static class StreamingViewAsIterable<T>
       extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
     private static final long serialVersionUID = 1L;
 
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> 
transform) {
-    }
+    private StreamingViewAsIterable() {}
 
     @Override
     public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
@@ -335,6 +348,16 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
     protected String getKindString() {
       return "StreamingViewAsIterable";
     }
+
+    static class Factory<T>
+        extends SingleInputOutputOverrideFactory<
+            PCollection<T>, PCollectionView<Iterable<T>>, View.AsIterable<T>> {
+      @Override
+      public PTransform<PCollection<T>, PCollectionView<Iterable<T>>> 
getReplacementTransform(
+          AsIterable<T> transform) {
+        return new StreamingViewAsIterable<>();
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/47333b0e/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 0818c36..36d679a 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -25,10 +25,10 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
 import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.runners.core.construction.PrimitiveCreate;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -66,7 +66,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
     registerTransformTranslator(Flatten.FlattenPCollectionList.class,
         new FlattenPCollectionTranslator());
-    registerTransformTranslator(Create.Values.class, new 
CreateValuesTranslator());
+    registerTransformTranslator(PrimitiveCreate.class, new 
CreateValuesTranslator());
     registerTransformTranslator(CreateApexPCollectionView.class,
         new CreateApexPCollectionViewTranslator());
     registerTransformTranslator(CreatePCollectionView.class,
@@ -175,5 +175,4 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
       LOG.debug("view {}", view.getName());
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/47333b0e/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
index 66fd27e..d6c4b97 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/CreateValuesTranslator.java
@@ -20,21 +20,22 @@ package org.apache.beam.runners.apex.translation;
 
 import 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
 import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.runners.core.construction.PrimitiveCreate;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
  * Wraps elements from Create.Values into an {@link UnboundedSource}.
  * mainly used for testing
  */
-class CreateValuesTranslator<T> implements 
TransformTranslator<Create.Values<T>> {
+class CreateValuesTranslator<T> implements 
TransformTranslator<PrimitiveCreate<T>> {
   private static final long serialVersionUID = 1451000241832745629L;
 
   @Override
-  public void translate(Create.Values<T> transform, TranslationContext 
context) {
-    UnboundedSource<T, ?> unboundedSource = new 
ValuesSource<>(transform.getElements(),
-        ((PCollection<T>) context.getOutput()).getCoder());
+  public void translate(PrimitiveCreate<T> transform, TranslationContext 
context) {
+    UnboundedSource<T, ?> unboundedSource =
+        new ValuesSource<>(
+            transform.getElements(), ((PCollection<T>) 
context.getOutput()).getCoder());
     ApexReadUnboundedInputOperator<T, ?> operator =
         new ApexReadUnboundedInputOperator<>(unboundedSource, 
context.getPipelineOptions());
     context.addOperator(operator, operator.output);

http://git-wip-us.apache.org/repos/asf/beam/blob/47333b0e/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
new file mode 100644
index 0000000..7bd38b0
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Create.Values;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+
+/**
+ * An implementation of {@link Create} that returns a primitive {@link 
PCollection}.
+ */
+public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> {
+  private final Create.Values<T> transform;
+
+  private PrimitiveCreate(Create.Values<T> transform) {
+    this.transform = transform;
+  }
+
+  @Override
+  public PCollection<T> expand(PBegin input) {
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(), WindowingStrategy.globalDefault(), 
IsBounded.BOUNDED);
+  }
+
+  public Iterable<T> getElements() {
+    return transform.getElements();
+  }
+
+  /**
+   * A {@link PTransformOverrideFactory} that creates instances of {@link 
PrimitiveCreate}.
+   */
+  public static class Factory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, Values<T>> {
+    @Override
+    public PTransform<PBegin, PCollection<T>> 
getReplacementTransform(Values<T> transform) {
+      return new PrimitiveCreate<>(transform);
+    }
+
+    @Override
+    public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+      return p.begin();
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        List<TaggedPValue> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+}
+

Reply via email to