Repository: beam Updated Branches: refs/heads/master 09d75a0b0 -> 6543e56d2
Remove the requirement to visit PCollectionViews in Dataflow Remove the unused addStep method in the Dataflow Translator. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e82bc4b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e82bc4b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e82bc4b Branch: refs/heads/master Commit: 7e82bc4b280cd35fca042a50f0055cd68850da68 Parents: 09d75a0 Author: Thomas Groh <tg...@google.com> Authored: Wed May 31 15:42:03 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Mon Jun 5 10:19:21 2017 -0700 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 72 +++------------ .../beam/runners/dataflow/ReadTranslator.java | 6 +- .../runners/dataflow/TransformTranslator.java | 26 +++--- .../DataflowPipelineTranslatorTest.java | 95 -------------------- 4 files changed, 24 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index af93ef5..8eaf61b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -438,12 +438,9 @@ public class DataflowPipelineTranslator { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { - producers.put(value, producer.toAppliedPTransform(getPipeline())); LOG.debug("Checking translation of {}", value); - if (!producer.isCompositeNode()) { - // Primitive transforms are the only ones assigned step names. - asOutputReference(value, producer.toAppliedPTransform(getPipeline())); - } + // Primitive transforms are the only ones assigned step names. + asOutputReference(value, producer.toAppliedPTransform(getPipeline())); } @Override @@ -471,48 +468,6 @@ public class DataflowPipelineTranslator { return stepContext; } - @Override - public Step addStep(PTransform<?, ? extends PValue> transform, Step original) { - Step step = original.clone(); - String stepName = step.getName(); - if (stepNames.put(getCurrentTransform(transform), stepName) != null) { - throw new IllegalArgumentException(transform + " already has a name specified"); - } - - Map<String, Object> properties = step.getProperties(); - if (properties != null) { - @Nullable List<Map<String, Object>> outputInfoList = null; - try { - // TODO: This should be done via a Structs accessor. - @Nullable List<Map<String, Object>> list = - (List<Map<String, Object>>) properties.get(PropertyNames.OUTPUT_INFO); - outputInfoList = list; - } catch (Exception e) { - throw new RuntimeException("Inconsistent dataflow pipeline translation", e); - } - if (outputInfoList != null && outputInfoList.size() > 0) { - Map<String, Object> firstOutputPort = outputInfoList.get(0); - @Nullable String name; - try { - name = getString(firstOutputPort, PropertyNames.OUTPUT_NAME); - } catch (Exception e) { - name = null; - } - if (name != null) { - registerOutputName(getOutput(transform), name); - } - } - } - - List<Step> steps = job.getSteps(); - if (steps == null) { - steps = new LinkedList<>(); - job.setSteps(steps); - } - steps.add(step); - return step; - } - public OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer) { String stepName = stepNames.get(producer); checkArgument(stepName != null, "%s doesn't have a name specified", producer); @@ -607,26 +562,19 @@ public class DataflowPipelineTranslator { } @Override - public long addOutput(PValue value) { - Coder<?> coder; - if (value instanceof PCollection) { - coder = ((PCollection<?>) value).getCoder(); - if (value instanceof PCollection) { - // Wrap the PCollection element Coder inside a WindowedValueCoder. - coder = WindowedValue.getFullCoder( - coder, - ((PCollection<?>) value).getWindowingStrategy().getWindowFn().windowCoder()); - } - } else { - // No output coder to encode. - coder = null; - } + public long addOutput(PCollection<?> value) { + translator.producers.put(value, translator.currentTransform); + // Wrap the PCollection element Coder inside a WindowedValueCoder. + Coder<?> coder = + WindowedValue.getFullCoder( + value.getCoder(), value.getWindowingStrategy().getWindowFn().windowCoder()); return addOutput(value, coder); } @Override public long addCollectionToSingletonOutput( - PValue inputValue, PValue outputValue) { + PCollection<?> inputValue, PCollectionView<?> outputValue) { + translator.producers.put(outputValue, translator.currentTransform); Coder<?> inputValueCoder = checkNotNull(translator.outputCoders.get(inputValue)); // The inputValueCoder for the input PCollection should be some http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java index 0b22d7e..693748a 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -29,7 +29,7 @@ import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.PCollection; /** * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end. @@ -41,7 +41,9 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { } public static <T> void translateReadHelper( - Source<T> source, PTransform<?, ? extends PValue> transform, TranslationContext context) { + Source<T> source, + PTransform<?, ? extends PCollection<?>> transform, + TranslationContext context) { try { StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java index 23949bd..a7452b2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java @@ -17,15 +17,16 @@ */ package org.apache.beam.runners.dataflow; -import com.google.api.services.dataflow.model.Step; import java.util.List; import java.util.Map; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.OutputReference; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; @@ -65,14 +66,6 @@ interface TransformTranslator<TransformT extends PTransform> { */ StepTranslationContext addStep(PTransform<?, ?> transform, String type); - /** - * Adds a pre-defined step to the Dataflow workflow. The given PTransform should be consistent - * with the Step, in terms of input, output and coder types. - * - * <p>This is a low-level operation, when using this method it is up to the caller to ensure - * that names do not collide. - */ - Step addStep(PTransform<?, ? extends PValue> transform, Step step); /** Encode a PValue reference as an output reference. */ OutputReference asOutputReference(PValue value, AppliedPTransform<?, ?, ?> producer); @@ -100,10 +93,11 @@ interface TransformTranslator<TransformT extends PTransform> { * Adds an input with the given name to this Dataflow step, coming from the specified input * PValue. * - * <p>The input {@link PValue} must have already been produced by a step earlier in this {@link - * Pipeline}. If the input value has not yet been produced yet (either by a call to {@link - * StepTranslationContext#addOutput(PValue)} or within a call to {@link - * TranslationContext#addStep(PTransform, Step)}), this method will throw an exception. + * <p>The input {@link PValue} must have already been produced by a step earlier in this + * {@link Pipeline}. If the input value has not yet been produced yet (by a call to either + * {@link StepTranslationContext#addOutput(PCollection)} or + * {@link StepTranslationContext#addCollectionToSingletonOutput(PCollection, PCollectionView)}) + * this method will throw an exception. */ void addInput(String name, PInput value); @@ -114,18 +108,18 @@ interface TransformTranslator<TransformT extends PTransform> { void addInput(String name, List<? extends Map<String, Object>> elements); /** - * Adds an output to this Dataflow step, producing the specified output {@code PValue}, + * Adds a primitive output to this Dataflow step, producing the specified output {@code PValue}, * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code * PCollection}, wraps its coder inside a {@code WindowedValueCoder}. Returns a pipeline level * unique id. */ - long addOutput(PValue value); + long addOutput(PCollection<?> value); /** * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified * input {@code PValue} and producing the specified output {@code PValue}. This step requires * special treatment for its output encoding. Returns a pipeline level unique id. */ - long addCollectionToSingletonOutput(PValue inputValue, PValue outputValue); + long addCollectionToSingletonOutput(PCollection<?> inputValue, PCollectionView<?> outputValue); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7e82bc4b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 87744f0..89dc2d5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.dataflow; -import static org.apache.beam.runners.dataflow.util.Structs.addObject; -import static org.apache.beam.runners.dataflow.util.Structs.getDictionary; import static org.apache.beam.runners.dataflow.util.Structs.getString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; @@ -73,7 +71,6 @@ import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.Structs; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; @@ -531,57 +528,6 @@ public class DataflowPipelineTranslatorTest implements Serializable { job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb()); } - @Test - public void testPredefinedAddStep() throws Exception { - DataflowPipelineOptions options = buildPipelineOptions(); - - DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - DataflowPipelineTranslator.registerTransformTranslator( - EmbeddedTransform.class, new EmbeddedTranslator()); - - // Create a predefined step using another pipeline - Step predefinedStep = createPredefinedStep(); - - // Create a pipeline that the predefined step will be embedded into - Pipeline pipeline = Pipeline.create(options); - pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in")) - .apply(ParDo.of(new NoOpFn())) - .apply(new EmbeddedTransform(predefinedStep.clone())) - .apply(ParDo.of(new NoOpFn())); - DataflowRunner runner = DataflowRunner.fromOptions(options); - runner.replaceTransforms(pipeline); - Job job = - translator - .translate( - pipeline, - runner, - Collections.<DataflowPackage>emptyList()) - .getJob(); - assertAllStepOutputsHaveUniqueIds(job); - - List<Step> steps = job.getSteps(); - assertEquals(4, steps.size()); - - // The input to the embedded step should match the output of the step before - Map<String, Object> step1Out = getOutputPortReference(steps.get(1)); - Map<String, Object> step2In = getDictionary( - steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT); - assertEquals(step1Out, step2In); - - // The output from the embedded step should match the input of the step after - Map<String, Object> step2Out = getOutputPortReference(steps.get(2)); - Map<String, Object> step3In = getDictionary( - steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT); - assertEquals(step2Out, step3In); - - // The step should not have been modified other than remapping the input - Step predefinedStepClone = predefinedStep.clone(); - Step embeddedStepClone = steps.get(2).clone(); - predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT); - embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT); - assertEquals(predefinedStepClone, embeddedStepClone); - } - /** * Construct a OutputReference for the output of the step. */ @@ -630,47 +576,6 @@ public class DataflowPipelineTranslatorTest implements Serializable { } /** - * A placeholder transform that will be used to substitute a predefined Step. - */ - private static class EmbeddedTransform - extends PTransform<PCollection<String>, PCollection<String>> { - private final Step step; - - public EmbeddedTransform(Step step) { - this.step = step; - } - - @Override - public PCollection<String> expand(PCollection<String> input) { - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - input.isBounded()); - } - - @Override - protected Coder<?> getDefaultOutputCoder() { - return StringUtf8Coder.of(); - } - } - - /** - * A TransformTranslator that adds the predefined Step using - * {@link TranslationContext#addStep} and remaps the input port reference. - */ - private static class EmbeddedTranslator - implements TransformTranslator<EmbeddedTransform> { - @Override public void translate(EmbeddedTransform transform, TranslationContext context) { - PCollection<String> input = context.getInput(transform); - addObject( - transform.step.getProperties(), - PropertyNames.PARALLEL_INPUT, - context.asOutputReference(input, context.getProducer(input))); - context.addStep(transform, transform.step); - } - } - - /** * A composite transform that returns an output that is unrelated to * the input. */