Repository: beam
Updated Branches:
  refs/heads/master 55340e617 -> 817688aac


Remove Unneccessary DataflowRunner Overrides

The removed overrides are indistinguishable from the transform that they
are overriding.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d8865de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d8865de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d8865de

Branch: refs/heads/master
Commit: 4d8865def33355551d16096831de5cf58c28272d
Parents: 55340e6
Author: Thomas Groh <tg...@google.com>
Authored: Thu Feb 16 09:14:47 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Feb 16 11:45:17 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 36 +++-----------------
 1 file changed, 5 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4d8865de/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index db6a7d9..f1270db 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -336,7 +336,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
       builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
       builder.put(Read.Bounded.class, StreamingBoundedRead.class);
-      builder.put(Window.Bound.class, AssignWindows.class);
       // In streaming mode must use either the custom Pubsub unbounded 
source/sink or
       // defer to Windmill's built-in implementation.
       builder.put(PubsubIO.Read.PubsubBoundedReader.class, 
UnsupportedIO.class);
@@ -351,7 +350,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       }
     } else {
       builder.put(Read.Unbounded.class, UnsupportedIO.class);
-      builder.put(Window.Bound.class, AssignWindows.class);
       builder.put(Write.Bound.class, BatchWrite.class);
       // In batch mode must use the custom Pubsub bounded source/sink.
       builder.put(PubsubUnboundedSource.class, UnsupportedIO.class);
@@ -376,32 +374,18 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
 
-    if (Combine.GroupedValues.class.equals(transform.getClass())
-        || GroupByKey.class.equals(transform.getClass())) {
-
+    if (Combine.GroupedValues.class.equals(transform.getClass())) {
       // For both Dataflow runners (streaming and batch), GroupByKey and 
GroupedValues are
       // primitives. Returning a primitive output instead of the expanded 
definition
       // signals to the translator that translation is necessary.
       @SuppressWarnings("unchecked")
       PCollection<?> pc = (PCollection<?>) input;
       @SuppressWarnings("unchecked")
-      OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal(
-          pc.getPipeline(),
-          transform instanceof GroupByKey
-              ? ((GroupByKey<?, ?>) 
transform).updateWindowingStrategy(pc.getWindowingStrategy())
-              : pc.getWindowingStrategy(),
-          pc.isBounded());
+      OutputT outputT =
+          (OutputT)
+              PCollection.createPrimitiveOutputInternal(
+                  pc.getPipeline(), pc.getWindowingStrategy(), pc.isBounded());
       return outputT;
-    } else if (Window.Bound.class.equals(transform.getClass())) {
-      /*
-       * TODO: make this the generic way overrides are applied (using 
super.apply() rather than
-       * Pipeline.applyTransform(); this allows the apply method to be 
replaced without inserting
-       * additional nodes into the graph.
-       */
-      // casting to wildcard
-      @SuppressWarnings("unchecked")
-      OutputT windowed = (OutputT) applyWindow((Window.Bound<?>) transform, 
(PCollection<?>) input);
-      return windowed;
     } else if 
(Flatten.FlattenPCollectionList.class.equals(transform.getClass())
         && ((PCollectionList<?>) input).size() == 0) {
       // This can cause downstream coder inference to be screwy. Most of the 
time, that won't be
@@ -432,16 +416,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  private <T> PCollection<T> applyWindow(
-      Window.Bound<?> intitialTransform, PCollection<?> initialInput) {
-    // types are matched at compile time
-    @SuppressWarnings("unchecked")
-    Window.Bound<T> transform = (Window.Bound<T>) intitialTransform;
-    @SuppressWarnings("unchecked")
-    PCollection<T> input = (PCollection<T>) initialInput;
-    return super.apply(new AssignWindows<>(transform), input);
-  }
-
   private String debuggerMessage(String projectId, String uniquifier) {
     return String.format("To debug your job, visit Google Cloud Debugger at: "
         + "https://console.developers.google.com/debug?project=%s&dbgee=%s";,

Reply via email to