Only Override CreatePCollectionView in Streaming

This permits us to use the appropriate view token for the
StreamingPCollectionViewWriterFn.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b94c99b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b94c99b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b94c99b

Branch: refs/heads/master
Commit: 7b94c99be43d82bcab9370f63c0d63646146ca97
Parents: 1783819
Author: Thomas Groh <[email protected]>
Authored: Fri Mar 3 10:56:29 2017 -0800
Committer: Thomas Groh <[email protected]>
Committed: Fri Mar 3 14:59:12 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  92 +++---
 .../dataflow/StreamingViewOverrides.java        | 287 +++----------------
 2 files changed, 76 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b94c99b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 50b6b4f..c609b54 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -64,6 +64,7 @@ import 
org.apache.beam.runners.core.construction.ReplacementOutputs;
 import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
+import 
org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import 
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -75,6 +76,8 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
@@ -94,18 +97,12 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
 import org.apache.beam.sdk.transforms.Combine.GroupedValues;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.View.AsIterable;
-import org.apache.beam.sdk.transforms.View.AsList;
-import org.apache.beam.sdk.transforms.View.AsMap;
-import org.apache.beam.sdk.transforms.View.AsMultimap;
-import org.apache.beam.sdk.transforms.View.AsSingleton;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -326,29 +323,8 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
               PTransformMatchers.classEqualTo(Read.Unbounded.class),
               new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, 
this))
           .put(
-              PTransformMatchers.classEqualTo(GloballyAsSingletonView.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  
StreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsMap.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsMap.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsMultimap.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsMultimap.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsSingleton.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsSingleton.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsList.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsList.class, this))
-          .put(
-              PTransformMatchers.classEqualTo(AsIterable.class),
-              new ReflectiveOneToOneOverrideFactory(
-                  StreamingViewOverrides.StreamingViewAsIterable.class, this));
+              
PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+              new StreamingCreatePCollectionViewFactory());
     } else {
       // In batch mode must use the custom Pubsub bounded source/sink.
       for (Class<? extends PTransform> unsupported :
@@ -719,30 +695,40 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     // have just recorded the full names during apply time.
     if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
       final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders 
= new TreeSet<>();
-      pipeline.traverseTopologically(new PipelineVisitor() {
-        @Override
-        public void visitValue(PValue value, TransformHierarchy.Node producer) 
{
-        }
-
-        @Override
-        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-          if 
(ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
-            
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
-          }
-        }
-
-        @Override
-        public CompositeBehavior 
enterCompositeTransform(TransformHierarchy.Node node) {
-          if 
(ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
-            
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
-          }
-          return CompositeBehavior.ENTER_TRANSFORM;
-        }
-
-        @Override
-        public void leaveCompositeTransform(TransformHierarchy.Node node) {
-        }
-      });
+      pipeline.traverseTopologically(
+          new PipelineVisitor() {
+            @Override
+            public void visitValue(PValue value, TransformHierarchy.Node 
producer) {}
+
+            @Override
+            public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+              if 
(ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+                
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+              }
+            }
+
+            @Override
+            public CompositeBehavior 
enterCompositeTransform(TransformHierarchy.Node node) {
+              if (node.getTransform() instanceof View.AsMap
+                  || node.getTransform() instanceof View.AsMultimap) {
+                PCollection<KV<?, ?>> input =
+                    (PCollection<KV<?, ?>>) 
Iterables.getOnlyElement(node.getInputs()).getValue();
+                KvCoder<?, ?> inputCoder = (KvCoder) input.getCoder();
+                try {
+                  inputCoder.getKeyCoder().verifyDeterministic();
+                } catch (NonDeterministicException e) {
+                  
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+                }
+              }
+              if 
(ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
+                
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
+              }
+              return CompositeBehavior.ENTER_TRANSFORM;
+            }
+
+            @Override
+            public void leaveCompositeTransform(TransformHierarchy.Node node) 
{}
+          });
 
       LOG.warn("Unable to use indexed implementation for View.AsMap and 
View.AsMultimap for {} "
           + "because the key coder is not deterministic. Falling back to 
singleton implementation "

http://git-wip-us.apache.org/repos/asf/beam/blob/7b94c99b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
index bab115f..8e005cf 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java
@@ -19,23 +19,18 @@
 package org.apache.beam.runners.dataflow;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
+import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import 
org.apache.beam.runners.dataflow.DataflowRunner.StreamingPCollectionViewWriterFn;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 
@@ -44,261 +39,58 @@ import org.apache.beam.sdk.values.PCollectionView;
  * types.
  */
 class StreamingViewOverrides {
-  static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowRunner#apply()
-    public StreamingCombineGloballyAsSingletonView(
-        DataflowRunner runner,
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-      this.transform = transform;
-    }
-
+  static class StreamingCreatePCollectionViewFactory<ElemT, ViewT>
+      extends SingleInputOutputOverrideFactory<
+          PCollection<ElemT>, PCollectionView<ViewT>, 
CreatePCollectionView<ElemT, ViewT>> {
     @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined =
-          input.apply(Combine.<InputT, 
OutputT>globally(transform.getCombineFn())
-              .withoutDefaults()
-              .withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined.getPipeline(),
-          combined.getWindowingStrategy(),
-          transform.getInsertDefault(),
-          transform.getInsertDefault()
-              ? transform.getCombineFn().defaultValue() : null,
-          combined.getCoder());
-      return combined
-          .apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, 
combined.getCoder())))
-          .apply(View.CreatePCollectionView.<OutputT, OutputT>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
-    }
-  }
-
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(Arrays.asList(c.element()));
+    public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> 
getReplacementTransform(
+        final CreatePCollectionView<ElemT, ViewT> transform) {
+      return new StreamingCreatePCollectionView<>(transform.getView());
     }
-  }
 
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-   * for the Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-    private final DataflowRunner runner;
+    private static class StreamingCreatePCollectionView<ElemT, ViewT>
+        extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
+      private final PCollectionView<ViewT> view;
 
-    @SuppressWarnings("unused") // used via reflection in 
DataflowRunner#apply()
-    public StreamingViewAsMap(DataflowRunner runner, View.AsMap<K, V> 
transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      private StreamingCreatePCollectionView(PCollectionView<ViewT> view) {
+        this.view = view;
       }
 
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, 
input.getCoder())))
-          .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
-   * Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, 
Iterable<V>>>> {
-    private final DataflowRunner runner;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowRunner#apply()
-    public StreamingViewAsMultimap(DataflowRunner runner, View.AsMultimap<K, 
V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> 
input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      @Override
+      public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+        return input
+            .apply(Combine.globally(new 
Concatenate<ElemT>()).withoutDefaults())
+            .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, 
input.getCoder())))
+            .apply(View.CreatePCollectionView.<ElemT, ViewT>of(view));
       }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, 
input.getCoder())))
-          .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, 
Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
     }
   }
 
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
-   * Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowRunner#apply()
-    public StreamingViewAsList(DataflowRunner runner, View.AsList<T> 
transform) {}
+  private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+    private boolean hasDefaultValue;
+    private T defaultValue;
 
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, 
input.getCoder())))
-          .apply(View.CreatePCollectionView.<T, List<T>>of(view));
+    SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+      this.hasDefaultValue = hasDefaultValue;
+      this.defaultValue = defaultValue;
     }
 
     @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} 
for the
-   * Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowRunner#apply()
-    public StreamingViewAsIterable(DataflowRunner runner, View.AsIterable<T> 
transform) { }
-
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
-          .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, 
input.getCoder())))
-          .apply(View.CreatePCollectionView.<T, Iterable<T>>of(view));
+    public T apply(T left, T right) {
+      throw new IllegalArgumentException(
+          "PCollection with more than one element "
+              + "accessed as a singleton view. Consider using 
Combine.globally().asSingleton() to "
+              + "combine the PCollection into a single value");
     }
 
     @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  /**
-   * Specialized expansion for
-   * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} 
for the
-   * Dataflow runner in streaming mode.
-   */
-  static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private View.AsSingleton<T> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowRunner#apply()
-    public StreamingViewAsSingleton(DataflowRunner runner, View.AsSingleton<T> 
transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine.globally(
-          new SingletonCombine<>(transform.hasDefaultValue(), 
transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends 
Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one 
element "
-            + "accessed as a singleton view. Consider using 
Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException(
-              "Empty PCollection accessed as a singleton view. "
-                  + "Consider setting withDefault to provide a default value");
-        }
+    public T identity() {
+      if (hasDefaultValue) {
+        return defaultValue;
+      } else {
+        throw new IllegalArgumentException(
+            "Empty PCollection accessed as a singleton view. "
+                + "Consider setting withDefault to provide a default value");
       }
     }
   }
@@ -306,11 +98,6 @@ class StreamingViewOverrides {
   /**
    * Combiner that combines {@code T}s into a single {@code List<T>} 
containing all inputs.
    *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link 
StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
    * @param <T> the type of elements to concatenate.
    */
   private static class Concatenate<T> extends CombineFn<T, List<T>, List<T>> {

Reply via email to