Replace the View.As transforms for Dataflow batch because the entire implementation is specialized.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2f815c5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2f815c5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2f815c5 Branch: refs/heads/tez-runner Commit: c2f815c581cf5d6cd72b600cef42f436c9702d85 Parents: d5a8aea Author: Luke Cwik <lc...@google.com> Authored: Tue Nov 14 07:47:53 2017 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Wed Nov 15 08:59:51 2017 -0800 ---------------------------------------------------------------------- .../runners/dataflow/BatchViewOverrides.java | 53 ++++----- .../runners/dataflow/CreateDataflowView.java | 19 +++- .../beam/runners/dataflow/DataflowRunner.java | 110 ++++++++++++++++--- .../dataflow/StreamingViewOverrides.java | 2 +- .../DataflowPipelineTranslatorTest.java | 12 +- 5 files changed, 142 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 9a77b4b..8ed41cb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -116,7 +116,7 @@ class BatchViewOverrides { * a warning to users to specify a deterministic key coder. */ static class BatchViewAsMap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + extends PTransform<PCollection<KV<K, V>>, PCollection<?>> { /** * A {@link DoFn} which groups elements by window boundaries. For each group, @@ -193,11 +193,11 @@ class BatchViewOverrides { } @Override - public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) { + public PCollection<?> expand(PCollection<KV<K, V>> input) { return this.<BoundedWindow>applyInternal(input); } - private <W extends BoundedWindow> PCollectionView<Map<K, V>> + private <W extends BoundedWindow> PCollection<?> applyInternal(PCollection<KV<K, V>> input) { try { return BatchViewAsMultimap.applyForMapLike(runner, input, view, true /* unique keys */); @@ -216,7 +216,7 @@ class BatchViewOverrides { } /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */ - private <W extends BoundedWindow> PCollectionView<Map<K, V>> + private <W extends BoundedWindow> PCollection<?> applyForSingletonFallback(PCollection<KV<K, V>> input) { @SuppressWarnings("unchecked") Coder<W> windowCoder = (Coder<W>) @@ -280,7 +280,7 @@ class BatchViewOverrides { * a warning to users to specify a deterministic key coder. */ static class BatchViewAsMultimap<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + extends PTransform<PCollection<KV<K, V>>, PCollection<?>> { /** * A {@link PTransform} that groups elements by the hash of window's byte representation * if the input {@link PCollection} is not within the global window. Otherwise by the hash @@ -672,11 +672,11 @@ class BatchViewOverrides { } @Override - public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) { + public PCollection<?> expand(PCollection<KV<K, V>> input) { return this.<BoundedWindow>applyInternal(input); } - private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> + private <W extends BoundedWindow> PCollection<?> applyInternal(PCollection<KV<K, V>> input) { try { return applyForMapLike(runner, input, view, false /* unique keys not expected */); @@ -690,7 +690,7 @@ class BatchViewOverrides { } /** Transforms the input {@link PCollection} into a singleton {@link Map} per window. */ - private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>> + private <W extends BoundedWindow> PCollection<?> applyForSingletonFallback(PCollection<KV<K, V>> input) { @SuppressWarnings("unchecked") Coder<W> windowCoder = (Coder<W>) @@ -727,7 +727,7 @@ class BatchViewOverrides { view); } - private static <K, V, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForMapLike( + private static <K, V, W extends BoundedWindow, ViewT> PCollection<?> applyForMapLike( DataflowRunner runner, PCollection<KV<K, V>> input, PCollectionView<ViewT> view, @@ -804,9 +804,10 @@ class BatchViewOverrides { PCollectionList.of(ImmutableList.of( perHashWithReifiedWindows, windowMapSizeMetadata, windowMapKeysMetadata)); - Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections()) - .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>of(view)); - return view; + PCollection<IsmRecord<WindowedValue<V>>> flattenedOutputs = + Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections()); + flattenedOutputs.apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>forBatch(view)); + return flattenedOutputs; } @Override @@ -843,7 +844,7 @@ class BatchViewOverrides { * </ul> */ static class BatchViewAsSingleton<T> - extends PTransform<PCollection<T>, PCollectionView<T>> { + extends PTransform<PCollection<T>, PCollection<?>> { /** * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows: @@ -900,7 +901,7 @@ class BatchViewOverrides { } @Override - public PCollectionView<T> expand(PCollection<T> input) { + public PCollection<?> expand(PCollection<T> input) { @SuppressWarnings("unchecked") Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); @@ -913,7 +914,7 @@ class BatchViewOverrides { view); } - static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT> + static <T, FinalT, ViewT, W extends BoundedWindow> PCollection<?> applyForSingleton( DataflowRunner runner, PCollection<T> input, @@ -936,8 +937,8 @@ class BatchViewOverrides { runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); reifiedPerWindowAndSorted.apply( - CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view)); - return view; + CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>forBatch(view)); + return reifiedPerWindowAndSorted; } @Override @@ -969,7 +970,7 @@ class BatchViewOverrides { * </ul> */ static class BatchViewAsList<T> - extends PTransform<PCollection<T>, PCollectionView<List<T>>> { + extends PTransform<PCollection<T>, PCollection<?>> { /** * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the * global window. Each {@link IsmRecord} has @@ -1054,11 +1055,11 @@ class BatchViewOverrides { } @Override - public PCollectionView<List<T>> expand(PCollection<T> input) { + public PCollection<?> expand(PCollection<T> input) { return applyForIterableLike(runner, input, view); } - static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT> applyForIterableLike( + static <T, W extends BoundedWindow, ViewT> PCollection<?> applyForIterableLike( DataflowRunner runner, PCollection<T> input, PCollectionView<ViewT> view) { @@ -1081,8 +1082,8 @@ class BatchViewOverrides { runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); reifiedPerWindowAndSorted.apply( - CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view)); - return view; + CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view)); + return reifiedPerWindowAndSorted; } PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input @@ -1092,8 +1093,8 @@ class BatchViewOverrides { runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); reifiedPerWindowAndSorted.apply( - CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view)); - return view; + CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>forBatch(view)); + return reifiedPerWindowAndSorted; } @Override @@ -1127,7 +1128,7 @@ class BatchViewOverrides { * </ul> */ static class BatchViewAsIterable<T> - extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { + extends PTransform<PCollection<T>, PCollection<?>> { private final DataflowRunner runner; private final PCollectionView<Iterable<T>> view; @@ -1140,7 +1141,7 @@ class BatchViewOverrides { } @Override - public PCollectionView<Iterable<T>> expand(PCollection<T> input) { + public PCollection<?> expand(PCollection<T> input) { return BatchViewAsList.applyForIterableLike(runner, input, view); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java index 3b01d69..10888c2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java @@ -25,20 +25,29 @@ import org.apache.beam.sdk.values.PCollectionView; /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */ public class CreateDataflowView<ElemT, ViewT> extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { - public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) { - return new CreateDataflowView<>(view); + public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) { + return new CreateDataflowView<>(view, false); + } + + public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) { + return new CreateDataflowView<>(view, true); } private final PCollectionView<ViewT> view; + private final boolean streaming; - private CreateDataflowView(PCollectionView<ViewT> view) { + private CreateDataflowView(PCollectionView<ViewT> view, boolean streaming) { this.view = view; + this.streaming = streaming; } @Override public PCollection<ElemT> expand(PCollection<ElemT> input) { - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder()); + if (streaming) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder()); + } + return (PCollection) view.getPCollection(); } public PCollectionView<ViewT> getView() { http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 0a20a0f..72e4f83 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -58,6 +58,7 @@ import java.util.Random; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory; @@ -124,6 +125,7 @@ import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; @@ -139,7 +141,6 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; @@ -395,28 +396,28 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { BatchStatefulParDoOverrides.singleOutputOverrideFactory(options))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class), - new ReflectiveOneToOneOverrideFactory( + PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveViewOverrideFactory( BatchViewOverrides.BatchViewAsMap.class, this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class), - new ReflectiveOneToOneOverrideFactory( + PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveViewOverrideFactory( BatchViewOverrides.BatchViewAsMultimap.class, this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class), - new ReflectiveOneToOneOverrideFactory( + PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveViewOverrideFactory( BatchViewOverrides.BatchViewAsSingleton.class, this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class), - new ReflectiveOneToOneOverrideFactory( + PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveViewOverrideFactory( BatchViewOverrides.BatchViewAsList.class, this))) .add( PTransformOverride.of( - PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class), - new ReflectiveOneToOneOverrideFactory( + PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveViewOverrideFactory( BatchViewOverrides.BatchViewAsIterable.class, this))); } overridesBuilder @@ -435,6 +436,81 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return overridesBuilder.build(); } + /** + * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the + * new replacement transform uses the supplied PCollectionView and does not create another instance. + */ + private static class ReflectiveViewOverrideFactory<InputT, ViewT> + implements PTransformOverrideFactory<PCollection<InputT>, + PValue, PTransform<PCollection<InputT>, PValue>> { + + private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement; + private final DataflowRunner runner; + + private ReflectiveViewOverrideFactory( + Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement, + DataflowRunner runner) { + this.replacement = replacement; + this.runner = runner; + } + + @Override + public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform( + final AppliedPTransform<PCollection<InputT>, + PValue, + PTransform<PCollection<InputT>, PValue>> transform) { + + final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>(); + transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() { + // Stores whether we have entered the expected composite view transform. + private boolean tracking = false; + + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + if (transform.getTransform() == node.getTransform()) { + tracking = true; + } + return super.enterCompositeTransform(node); + } + + @Override + public void visitPrimitiveTransform(Node node) { + if (tracking && node.getTransform() instanceof CreatePCollectionView) { + checkState( + viewTransformRef.compareAndSet(null, (CreatePCollectionView) node.getTransform()), + "Found more then one instance of a CreatePCollectionView when " + + "attempting to replace %s, found [%s, %s]", + replacement, viewTransformRef.get(), node.getTransform()); + } + } + + @Override + public void leaveCompositeTransform(Node node) { + if (transform.getTransform() == node.getTransform()) { + tracking = false; + } + } + }); + + checkState(viewTransformRef.get() != null, + "Expected to find CreatePCollectionView contained within %s", + transform.getTransform()); + PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep = + InstanceBuilder.ofType(replacement) + .withArg(DataflowRunner.class, runner) + .withArg(CreatePCollectionView.class, viewTransformRef.get()) + .build(); + return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep); + } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) { + // We do not replace any of the outputs because we expect that the new PTransform will re-use the original + // PCollectionView that was returned. + return ImmutableMap.of(); + } + } + private static class ReflectiveOneToOneOverrideFactory< InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>> extends SingleInputOutputOverrideFactory< @@ -1258,19 +1334,21 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { */ private static class Deduplicate<T> extends PTransform<PCollection<ValueWithRecordId<T>>, PCollection<T>> { + // Use a finite set of keys to improve bundling. Without this, the key space // will be the space of ids which is potentially very large, which results in much // more per-key overhead. private static final int NUM_RESHARD_KEYS = 10000; + @Override public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) { return input .apply(WithKeys.of(new SerializableFunction<ValueWithRecordId<T>, Integer>() { - @Override - public Integer apply(ValueWithRecordId<T> value) { - return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS; - } - })) + @Override + public Integer apply(ValueWithRecordId<T> value) { + return Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS; + } + })) // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through // WindmillSink. .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of()) http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java index 1853248..69099c6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java @@ -68,7 +68,7 @@ class StreamingViewOverrides { return input .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults()) .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) - .apply(CreateDataflowView.<ElemT, ViewT>of(view)); + .apply(CreateDataflowView.<ElemT, ViewT>forStreaming(view)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c2f815c5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index cc43e27..e03abb9 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); - assertEquals(10, steps.size()); + assertEquals(5, steps.size()); @SuppressWarnings("unchecked") List<Map<String, Object>> toIsmRecordOutputs = - (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO); + (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO); assertTrue( Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); - Step collectionToSingletonStep = steps.get(9); + Step collectionToSingletonStep = steps.get(4); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } @@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); - assertEquals(4, steps.size()); + assertEquals(3, steps.size()); @SuppressWarnings("unchecked") List<Map<String, Object>> toIsmRecordOutputs = - (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO); + (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO); assertTrue( Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); - Step collectionToSingletonStep = steps.get(3); + Step collectionToSingletonStep = steps.get(2); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); }