Only Override CreatePCollectionView in Streaming This permits us to use the appropriate view token for the StreamingPCollectionViewWriterFn.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b94c99b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b94c99b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b94c99b Branch: refs/heads/master Commit: 7b94c99be43d82bcab9370f63c0d63646146ca97 Parents: 1783819 Author: Thomas Groh <[email protected]> Authored: Fri Mar 3 10:56:29 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Fri Mar 3 14:59:12 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 92 +++--- .../dataflow/StreamingViewOverrides.java | 287 +++---------------- 2 files changed, 76 insertions(+), 303 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7b94c99b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 50b6b4f..c609b54 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -64,6 +64,7 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; +import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; @@ -75,6 +76,8 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; @@ -94,18 +97,12 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView; import org.apache.beam.sdk.transforms.Combine.GroupedValues; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.View.AsIterable; -import org.apache.beam.sdk.transforms.View.AsList; -import org.apache.beam.sdk.transforms.View.AsMap; -import org.apache.beam.sdk.transforms.View.AsMultimap; -import org.apache.beam.sdk.transforms.View.AsSingleton; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -326,29 +323,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformMatchers.classEqualTo(Read.Unbounded.class), new ReflectiveRootOverrideFactory(StreamingUnboundedRead.class, this)) .put( - PTransformMatchers.classEqualTo(GloballyAsSingletonView.class), - new ReflectiveOneToOneOverrideFactory( - StreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, this)) - .put( - PTransformMatchers.classEqualTo(AsMap.class), - new ReflectiveOneToOneOverrideFactory( - StreamingViewOverrides.StreamingViewAsMap.class, this)) - .put( - PTransformMatchers.classEqualTo(AsMultimap.class), - new ReflectiveOneToOneOverrideFactory( - StreamingViewOverrides.StreamingViewAsMultimap.class, this)) - .put( - PTransformMatchers.classEqualTo(AsSingleton.class), - new ReflectiveOneToOneOverrideFactory( - StreamingViewOverrides.StreamingViewAsSingleton.class, this)) - .put( - PTransformMatchers.classEqualTo(AsList.class), - new ReflectiveOneToOneOverrideFactory( - StreamingViewOverrides.StreamingViewAsList.class, this)) - .put( - PTransformMatchers.classEqualTo(AsIterable.class), - new ReflectiveOneToOneOverrideFactory( - StreamingViewOverrides.StreamingViewAsIterable.class, this)); + PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), + new StreamingCreatePCollectionViewFactory()); } else { // In batch mode must use the custom Pubsub bounded source/sink. for (Class<? extends PTransform> unsupported : @@ -719,30 +695,40 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // have just recorded the full names during apply time. if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); - pipeline.traverseTopologically(new PipelineVisitor() { - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) { - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { - ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); - } - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { - ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); - } - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - } - }); + pipeline.traverseTopologically( + new PipelineVisitor() { + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) {} + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + if (node.getTransform() instanceof View.AsMap + || node.getTransform() instanceof View.AsMultimap) { + PCollection<KV<?, ?>> input = + (PCollection<KV<?, ?>>) Iterables.getOnlyElement(node.getInputs()).getValue(); + KvCoder<?, ?> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (NonDeterministicException e) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + } + if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) {} + }); LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} " + "because the key coder is not deterministic. Falling back to singleton implementation " http://git-wip-us.apache.org/repos/asf/beam/blob/7b94c99b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java index bab115f..8e005cf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java @@ -19,23 +19,18 @@ package org.apache.beam.runners.dataflow; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.Map; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.dataflow.DataflowRunner.StreamingPCollectionViewWriterFn; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -44,261 +39,58 @@ import org.apache.beam.sdk.values.PCollectionView; * types. */ class StreamingViewOverrides { - static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> - extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { - Combine.GloballyAsSingletonView<InputT, OutputT> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingCombineGloballyAsSingletonView( - DataflowRunner runner, - Combine.GloballyAsSingletonView<InputT, OutputT> transform) { - this.transform = transform; - } - + static class StreamingCreatePCollectionViewFactory<ElemT, ViewT> + extends SingleInputOutputOverrideFactory< + PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT, ViewT>> { @Override - public PCollectionView<OutputT> expand(PCollection<InputT> input) { - PCollection<OutputT> combined = - input.apply(Combine.<InputT, OutputT>globally(transform.getCombineFn()) - .withoutDefaults() - .withFanout(transform.getFanout())); - - PCollectionView<OutputT> view = PCollectionViews.singletonView( - combined.getPipeline(), - combined.getWindowingStrategy(), - transform.getInsertDefault(), - transform.getInsertDefault() - ? transform.getCombineFn().defaultValue() : null, - combined.getCoder()); - return combined - .apply(ParDo.of(new WrapAsList<OutputT>())) - .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, combined.getCoder()))) - .apply(View.CreatePCollectionView.<OutputT, OutputT>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingCombineGloballyAsSingletonView"; - } - } - - private static class WrapAsList<T> extends DoFn<T, List<T>> { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(Arrays.asList(c.element())); + public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform( + final CreatePCollectionView<ElemT, ViewT> transform) { + return new StreamingCreatePCollectionView<>(transform.getView()); } - } - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} - * for the Dataflow runner in streaming mode. - */ - static class StreamingViewAsMap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { - private final DataflowRunner runner; + private static class StreamingCreatePCollectionView<ElemT, ViewT> + extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> { + private final PCollectionView<ViewT> view; - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingViewAsMap(DataflowRunner runner, View.AsMap<K, V> transform) { - this.runner = runner; - } - - @Override - public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, V>> view = - PCollectionViews.mapView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (NonDeterministicException e) { - runner.recordViewUsesNonDeterministicKeyCoder(this); + private StreamingCreatePCollectionView(PCollectionView<ViewT> view) { + this.view = view; } - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) - .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMap"; - } - } - - /** - * Specialized expansion for {@link - * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the - * Dataflow runner in streaming mode. - */ - static class StreamingViewAsMultimap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { - private final DataflowRunner runner; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingViewAsMultimap(DataflowRunner runner, View.AsMultimap<K, V> transform) { - this.runner = runner; - } - - @Override - public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { - PCollectionView<Map<K, Iterable<V>>> view = - PCollectionViews.multimapView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - @SuppressWarnings({"rawtypes", "unchecked"}) - KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); - try { - inputCoder.getKeyCoder().verifyDeterministic(); - } catch (NonDeterministicException e) { - runner.recordViewUsesNonDeterministicKeyCoder(this); + @Override + public PCollectionView<ViewT> expand(PCollection<ElemT> input) { + return input + .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults()) + .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) + .apply(View.CreatePCollectionView.<ElemT, ViewT>of(view)); } - - return input - .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) - .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) - .apply(View.CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); - } - - @Override - protected String getKindString() { - return "StreamingViewAsMultimap"; } } - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the - * Dataflow runner in streaming mode. - */ - static class StreamingViewAsList<T> - extends PTransform<PCollection<T>, PCollectionView<List<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingViewAsList(DataflowRunner runner, View.AsList<T> transform) {} + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { + private boolean hasDefaultValue; + private T defaultValue; - @Override - public PCollectionView<List<T>> expand(PCollection<T> input) { - PCollectionView<List<T>> view = - PCollectionViews.listView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) - .apply(View.CreatePCollectionView.<T, List<T>>of(view)); + SingletonCombine(boolean hasDefaultValue, T defaultValue) { + this.hasDefaultValue = hasDefaultValue; + this.defaultValue = defaultValue; } @Override - protected String getKindString() { - return "StreamingViewAsList"; - } - } - - /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the - * Dataflow runner in streaming mode. - */ - static class StreamingViewAsIterable<T> - extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingViewAsIterable(DataflowRunner runner, View.AsIterable<T> transform) { } - - @Override - public PCollectionView<Iterable<T>> expand(PCollection<T> input) { - PCollectionView<Iterable<T>> view = - PCollectionViews.iterableView( - input.getPipeline(), - input.getWindowingStrategy(), - input.getCoder()); - - return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) - .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) - .apply(View.CreatePCollectionView.<T, Iterable<T>>of(view)); + public T apply(T left, T right) { + throw new IllegalArgumentException( + "PCollection with more than one element " + + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + + "combine the PCollection into a single value"); } @Override - protected String getKindString() { - return "StreamingViewAsIterable"; - } - } - - /** - * Specialized expansion for - * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton} for the - * Dataflow runner in streaming mode. - */ - static class StreamingViewAsSingleton<T> - extends PTransform<PCollection<T>, PCollectionView<T>> { - private View.AsSingleton<T> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingViewAsSingleton(DataflowRunner runner, View.AsSingleton<T> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<T> expand(PCollection<T> input) { - Combine.Globally<T, T> combine = Combine.globally( - new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); - if (!transform.hasDefaultValue()) { - combine = combine.withoutDefaults(); - } - return input.apply(combine.asSingletonView()); - } - - @Override - protected String getKindString() { - return "StreamingViewAsSingleton"; - } - - private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { - private boolean hasDefaultValue; - private T defaultValue; - - SingletonCombine(boolean hasDefaultValue, T defaultValue) { - this.hasDefaultValue = hasDefaultValue; - this.defaultValue = defaultValue; - } - - @Override - public T apply(T left, T right) { - throw new IllegalArgumentException("PCollection with more than one element " - + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " - + "combine the PCollection into a single value"); - } - - @Override - public T identity() { - if (hasDefaultValue) { - return defaultValue; - } else { - throw new IllegalArgumentException( - "Empty PCollection accessed as a singleton view. " - + "Consider setting withDefault to provide a default value"); - } + public T identity() { + if (hasDefaultValue) { + return defaultValue; + } else { + throw new IllegalArgumentException( + "Empty PCollection accessed as a singleton view. " + + "Consider setting withDefault to provide a default value"); } } } @@ -306,11 +98,6 @@ class StreamingViewOverrides { /** * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. * - * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, - * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. - * They require the input {@link PCollection} fits in memory. - * For a large {@link PCollection} this is expected to crash! - * * @param <T> the type of elements to concatenate. */ private static class Concatenate<T> extends CombineFn<T, List<T>, List<T>> {
