This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 9d9ffa5 [BEAM-2929] Remove Dataflow expansions for PCollectionView that have been migrated into the Dataflow service for the portability framework. 9d9ffa5 is described below commit 9d9ffa5f1a3a9f280dfafae15944764a568515ef Author: Luke Cwik <lc...@google.com> AuthorDate: Tue Dec 12 16:27:58 2017 -0800 [BEAM-2929] Remove Dataflow expansions for PCollectionView that have been migrated into the Dataflow service for the portability framework. --- .../dataflow/DataflowPipelineTranslator.java | 37 +++++++++++++++++++--- .../beam/runners/dataflow/DataflowRunner.java | 17 ++++++---- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 4f9b939..5c26e0d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment; import static org.apache.beam.runners.dataflow.util.Structs.addBoolean; import static org.apache.beam.runners.dataflow.util.Structs.addDictionary; import static org.apache.beam.runners.dataflow.util.Structs.addList; @@ -84,6 +85,7 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; @@ -349,7 +351,7 @@ public class DataflowPipelineTranslator { job.setLabels(options.getLabels()); } if (options.isStreaming() - && !DataflowRunner.hasExperiment(options, "enable_windmill_service")) { + && !hasExperiment(options, "enable_windmill_service")) { // Use separate data disk for streaming. Disk disk = new Disk(); disk.setDiskType(options.getWorkerDiskType()); @@ -447,13 +449,22 @@ public class DataflowPipelineTranslator { public void visitValue(PValue value, TransformHierarchy.Node producer) { LOG.debug("Checking translation of {}", value); // Primitive transforms are the only ones assigned step names. - if (producer.getTransform() instanceof CreateDataflowView) { - // CreateDataflowView produces a dummy output (as it must be a primitive transform) but - // in the Dataflow Job graph produces only the view and not the output PCollection. + if (producer.getTransform() instanceof CreateDataflowView + && !hasExperiment(options, "beam_fn_api")) { + // CreateDataflowView produces a dummy output (as it must be a primitive transform) + // but in the Dataflow Job graph produces only the view and not the output PCollection. asOutputReference( ((CreateDataflowView) producer.getTransform()).getView(), producer.toAppliedPTransform(getPipeline())); return; + } else if (producer.getTransform() instanceof View.CreatePCollectionView + && hasExperiment(options, "beam_fn_api")) { + // View.CreatePCollectionView produces a dummy output (as it must be a primitive transform) + // but in the Dataflow Job graph produces only the view and not the output PCollection. + asOutputReference( + ((View.CreatePCollectionView) producer.getTransform()).getView(), + producer.toAppliedPTransform(getPipeline())); + return; } asOutputReference(value, producer.toAppliedPTransform(getPipeline())); } @@ -680,6 +691,24 @@ public class DataflowPipelineTranslator { static { registerTransformTranslator( + View.CreatePCollectionView.class, + new TransformTranslator<View.CreatePCollectionView>() { + @Override + public void translate(View.CreatePCollectionView transform, TranslationContext context) { + translateTyped(transform, context); + } + + private <ElemT, ViewT> void translateTyped( + View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) { + StepTranslationContext stepContext = + context.addStep(transform, "CollectionToSingleton"); + PCollection<ElemT> input = context.getInput(transform); + stepContext.addInput(PropertyNames.PARALLEL_INPUT, input); + stepContext.addCollectionToSingletonOutput(input, transform.getView()); + } + }); + + registerTransformTranslator( CreateDataflowView.class, new TransformTranslator<CreateDataflowView>() { @Override diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ddad43f..942d36b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -382,11 +382,13 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { .add( PTransformOverride.of( PTransformMatchers.classEqualTo(Read.Unbounded.class), - new StreamingUnboundedReadOverrideFactory())) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), - new StreamingCreatePCollectionViewFactory())); + new StreamingUnboundedReadOverrideFactory())); + if (!hasExperiment(options, "beam_fn_api")) { + overridesBuilder.add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), + new StreamingCreatePCollectionViewFactory())); + } } else { overridesBuilder // State and timer pardos are implemented by expansion to GBK-then-ParDo @@ -397,7 +399,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { .add( PTransformOverride.of( PTransformMatchers.stateOrTimerParDoSingle(), - BatchStatefulParDoOverrides.singleOutputOverrideFactory(options))) + BatchStatefulParDoOverrides.singleOutputOverrideFactory(options))); + if (!hasExperiment(options, "beam_fn_api")) { + overridesBuilder .add( PTransformOverride.of( PTransformMatchers.classEqualTo(View.AsMap.class), @@ -422,6 +426,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformMatchers.classEqualTo(View.AsIterable.class), new ReflectiveViewOverrideFactory( BatchViewOverrides.BatchViewAsIterable.class, this))); + } } // Expands into Reshuffle and single-output ParDo, so has to be before the overrides below. if (hasExperiment(options, "beam_fn_api")) { -- To stop receiving notification emails like this one, please contact ['"commits@beam.apache.org" <commits@beam.apache.org>'].