Repository: beam Updated Branches: refs/heads/master 55340e617 -> 817688aac
Remove Unneccessary DataflowRunner Overrides The removed overrides are indistinguishable from the transform that they are overriding. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d8865de Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d8865de Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d8865de Branch: refs/heads/master Commit: 4d8865def33355551d16096831de5cf58c28272d Parents: 55340e6 Author: Thomas Groh <tg...@google.com> Authored: Thu Feb 16 09:14:47 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Thu Feb 16 11:45:17 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 36 +++----------------- 1 file changed, 5 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4d8865de/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index db6a7d9..f1270db 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -336,7 +336,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { builder.put(View.AsIterable.class, StreamingViewAsIterable.class); builder.put(Read.Unbounded.class, StreamingUnboundedRead.class); builder.put(Read.Bounded.class, StreamingBoundedRead.class); - builder.put(Window.Bound.class, AssignWindows.class); // In streaming mode must use either the custom Pubsub unbounded source/sink or // defer to Windmill's built-in implementation. builder.put(PubsubIO.Read.PubsubBoundedReader.class, UnsupportedIO.class); @@ -351,7 +350,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } else { builder.put(Read.Unbounded.class, UnsupportedIO.class); - builder.put(Window.Bound.class, AssignWindows.class); builder.put(Write.Bound.class, BatchWrite.class); // In batch mode must use the custom Pubsub bounded source/sink. builder.put(PubsubUnboundedSource.class, UnsupportedIO.class); @@ -376,32 +374,18 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { public <OutputT extends POutput, InputT extends PInput> OutputT apply( PTransform<InputT, OutputT> transform, InputT input) { - if (Combine.GroupedValues.class.equals(transform.getClass()) - || GroupByKey.class.equals(transform.getClass())) { - + if (Combine.GroupedValues.class.equals(transform.getClass())) { // For both Dataflow runners (streaming and batch), GroupByKey and GroupedValues are // primitives. Returning a primitive output instead of the expanded definition // signals to the translator that translation is necessary. @SuppressWarnings("unchecked") PCollection<?> pc = (PCollection<?>) input; @SuppressWarnings("unchecked") - OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal( - pc.getPipeline(), - transform instanceof GroupByKey - ? ((GroupByKey<?, ?>) transform).updateWindowingStrategy(pc.getWindowingStrategy()) - : pc.getWindowingStrategy(), - pc.isBounded()); + OutputT outputT = + (OutputT) + PCollection.createPrimitiveOutputInternal( + pc.getPipeline(), pc.getWindowingStrategy(), pc.isBounded()); return outputT; - } else if (Window.Bound.class.equals(transform.getClass())) { - /* - * TODO: make this the generic way overrides are applied (using super.apply() rather than - * Pipeline.applyTransform(); this allows the apply method to be replaced without inserting - * additional nodes into the graph. - */ - // casting to wildcard - @SuppressWarnings("unchecked") - OutputT windowed = (OutputT) applyWindow((Window.Bound<?>) transform, (PCollection<?>) input); - return windowed; } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) && ((PCollectionList<?>) input).size() == 0) { // This can cause downstream coder inference to be screwy. Most of the time, that won't be @@ -432,16 +416,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - private <T> PCollection<T> applyWindow( - Window.Bound<?> intitialTransform, PCollection<?> initialInput) { - // types are matched at compile time - @SuppressWarnings("unchecked") - Window.Bound<T> transform = (Window.Bound<T>) intitialTransform; - @SuppressWarnings("unchecked") - PCollection<T> input = (PCollection<T>) initialInput; - return super.apply(new AssignWindows<>(transform), input); - } - private String debuggerMessage(String projectId, String uniquifier) { return String.format("To debug your job, visit Google Cloud Debugger at: " + "https://console.developers.google.com/debug?project=%s&dbgee=%s",