Move Streaming View Overrides out of the Flink Runner

These are specific to the streaming Runner and don't need to be in the
FlinkRunner. This is also preparation for getting Flink out of the
Runner.apply().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27ad140d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27ad140d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27ad140d

Branch: refs/heads/master
Commit: 27ad140dc234245d82a0e8fe4471f9d1343e625b
Parents: 88e8425
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Feb 22 13:24:06 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Feb 22 18:24:15 2017 +0100

----------------------------------------------------------------------
 .../apache/beam/runners/flink/FlinkRunner.java  | 359 +-----------------
 .../flink/FlinkStreamingViewOverrides.java      | 372 +++++++++++++++++++
 .../FlinkStreamingTransformTranslators.java     |   8 +-
 3 files changed, 385 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/27ad140d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 0f44ba9..b3c7c65 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -24,7 +24,6 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -33,24 +32,14 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
@@ -120,12 +109,13 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
     ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, 
Class<?>>builder();
     if (options.isStreaming()) {
       builder.put(Combine.GloballyAsSingletonView.class,
-          StreamingCombineGloballyAsSingletonView.class);
-      builder.put(View.AsMap.class, StreamingViewAsMap.class);
-      builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
-      builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
-      builder.put(View.AsList.class, StreamingViewAsList.class);
-      builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
+          
FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class);
+      builder.put(View.AsMap.class, 
FlinkStreamingViewOverrides.StreamingViewAsMap.class);
+      builder.put(View.AsMultimap.class, 
FlinkStreamingViewOverrides.StreamingViewAsMultimap.class);
+      builder.put(View.AsSingleton.class,
+          FlinkStreamingViewOverrides.StreamingViewAsSingleton.class);
+      builder.put(View.AsList.class, 
FlinkStreamingViewOverrides.StreamingViewAsList.class);
+      builder.put(View.AsIterable.class, 
FlinkStreamingViewOverrides.StreamingViewAsIterable.class);
     }
     overrides = builder.build();
   }
@@ -244,7 +234,7 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
   /**
    * Records that the {@link PTransform} requires a deterministic key coder.
    */
-  private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> 
ptransform) {
+  void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
     ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
   }
 
@@ -287,337 +277,4 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
           ptransformViewNamesWithNonDeterministicKeyCoders);
     }
   }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-   * for the Flink runner in streaming mode.
-   */
-  private static class StreamingViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-    private final transient FlinkRunner runner;
-
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
-   * Flink runner in streaming mode.
-   */
-  private static class StreamingViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, 
Iterable<V>>>> {
-
-    private final transient FlinkRunner runner;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> 
transform) {
-      this.runner = runner;
-    }
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> 
input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        runner.recordViewUsesNonDeterministicKeyCoder(this);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, 
Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
-   * Flink runner in streaming mode.
-   */
-  private static class StreamingViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
-
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} 
for the
-   * Flink runner in streaming mode.
-   */
-  private static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> 
transform) { }
-
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
-          .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(Collections.singletonList(c.element()));
-    }
-  }
-
-  /**
-   * Specialized expansion for
-   * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} 
for the
-   * Flink runner in streaming mode.
-   */
-  private static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private View.AsSingleton<T> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> 
transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine.globally(
-          new SingletonCombine<>(transform.hasDefaultValue(), 
transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends 
Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one 
element "
-            + "accessed as a singleton view. Consider using 
Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException(
-              "Empty PCollection accessed as a singleton view. "
-                  + "Consider setting withDefault to provide a default value");
-        }
-      }
-    }
-  }
-
-  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
-    public StreamingCombineGloballyAsSingletonView(
-        FlinkRunner runner,
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined =
-          input.apply(Combine.globally(transform.getCombineFn())
-              .withoutDefaults()
-              .withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined.getPipeline(),
-          combined.getWindowingStrategy(),
-          transform.getInsertDefault(),
-          transform.getInsertDefault()
-              ? transform.getCombineFn().defaultValue() : null,
-          combined.getCoder());
-      return combined
-          .apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
-    }
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} 
containing all inputs.
-   *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link 
StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, 
List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<T>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> 
inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, 
Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
-  /**
-   * Creates a primitive {@link PCollectionView}.
-   *
-   * <p>For internal use only by runner implementors.
-   *
-   * @param <ElemT> The type of the elements of the input PCollection
-   * @param <ViewT> The type associated with the {@link PCollectionView} used 
as a side input
-   */
-  public static class CreateFlinkPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
-    private PCollectionView<ViewT> view;
-
-    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
-      this.view = view;
-    }
-
-    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
-        PCollectionView<ViewT> view) {
-      return new CreateFlinkPCollectionView<>(view);
-    }
-
-    @Override
-    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
-      return view;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/27ad140d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
new file mode 100644
index 0000000..0a9df4e
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Flink streaming overrides for various view (side input) transforms.
+ */
+public class FlinkStreamingViewOverrides {
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+   * for the Flink runner in streaming mode.
+   */
+  static class StreamingViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    private final transient FlinkRunner runner;
+
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
+    }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * View.AsMultimap View.AsMultimap} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, 
Iterable<V>>>> {
+
+    private final transient FlinkRunner runner;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> 
transform) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> 
input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, 
Iterable<V>>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsList View.AsList} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
+
+    @Override
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsIterable View.AsIterable} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> 
transform) { }
+
+    @Override
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Specialized expansion for
+   * {@link View.AsSingleton View.AsSingleton} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
+    private View.AsSingleton<T> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> 
transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<T> expand(PCollection<T> input) {
+      Combine.Globally<T, T> combine = Combine.globally(
+          new SingletonCombine<>(transform.hasDefaultValue(), 
transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsSingleton";
+    }
+
+    private static class SingletonCombine<T> extends 
Combine.BinaryCombineFn<T> {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right) {
+        throw new IllegalArgumentException("PCollection with more than one 
element "
+            + "accessed as a singleton view. Consider using 
Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity() {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException(
+              "Empty PCollection accessed as a singleton view. "
+                  + "Consider setting withDefault to provide a default value");
+        }
+      }
+    }
+  }
+
+  static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingCombineGloballyAsSingletonView(
+        FlinkRunner runner,
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      PCollection<OutputT> combined =
+          input.apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults()
+              .withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          combined.getPipeline(),
+          combined.getWindowingStrategy(),
+          transform.getInsertDefault(),
+          transform.getInsertDefault()
+              ? transform.getCombineFn().defaultValue() : null,
+          combined.getCoder());
+      return combined
+          .apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
+
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(Collections.singletonList(c.element()));
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} 
containing all inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link 
StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, 
List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> 
inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, 
Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used 
as a side input
+   */
+  public static class CreateFlinkPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateFlinkPCollectionView<>(view);
+    }
+
+    @Override
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+      return view;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/27ad140d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 7f86488..742ce91 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.runners.flink.FlinkStreamingViewOverrides;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.FlinkCoder;
@@ -128,7 +129,8 @@ public class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
     TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslator());
     TRANSLATORS.put(
-        FlinkRunner.CreateFlinkPCollectionView.class, new 
CreateViewStreamingTranslator());
+        FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
+        new CreateViewStreamingTranslator());
 
     TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
@@ -686,11 +688,11 @@ public class FlinkStreamingTransformTranslators {
 
   private static class CreateViewStreamingTranslator<ElemT, ViewT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-      FlinkRunner.CreateFlinkPCollectionView<ElemT, ViewT>> {
+      FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
 
     @Override
     public void translateNode(
-        FlinkRunner.CreateFlinkPCollectionView<ElemT, ViewT> transform,
+        FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> 
transform,
         FlinkStreamingTranslationContext context) {
       // just forward
       DataStream<WindowedValue<List<ElemT>>> inputDataSet =

Reply via email to