Repository: incubator-beam Updated Branches: refs/heads/master ae52ec1bc -> 5b31a3699
Only provide expanded Inputs and Outputs This removes PInput and POutput from the immediate API Surface of TransformHierarchy.Node, and forces Pipeline Visitors to access only the expanded version of the output. This is part of the move towards the runner-agnostic representation of a graph. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55d333bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55d333bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55d333bf Branch: refs/heads/master Commit: 55d333bff68809ff1a9154491ace02d2d16e3b85 Parents: ae52ec1 Author: Thomas Groh <tg...@google.com> Authored: Mon Dec 5 14:29:05 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Wed Dec 7 09:14:18 2016 -0800 ---------------------------------------------------------------------- .../apex/translation/TranslationContext.java | 4 +-- .../beam/runners/direct/DirectGraphVisitor.java | 9 +++---- .../direct/KeyedPValueTrackingVisitor.java | 2 +- .../FlinkBatchPipelineTranslator.java | 4 +-- .../FlinkStreamingPipelineTranslator.java | 7 ++---- .../dataflow/DataflowPipelineTranslator.java | 3 +-- .../apache/beam/runners/spark/SparkRunner.java | 17 +++++++------ .../beam/sdk/runners/TransformHierarchy.java | 26 +++++++++++--------- .../sdk/runners/TransformHierarchyTest.java | 13 ++++------ 9 files changed, 38 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 259afbd..3bf01a8 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -35,7 +35,6 @@ import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.PCollection; @@ -72,8 +71,7 @@ class TranslationContext { } public void setCurrentTransform(TransformHierarchy.Node treeNode) { - this.currentTransform = AppliedPTransform.of(treeNode.getFullName(), - treeNode.getInput(), treeNode.getOutput(), (PTransform) treeNode.getTransform()); + this.currentTransform = treeNode.toAppliedPTransform(); } public ApexPipelineOptions getPipelineOptions() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index cd9d120..4f38bce 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -79,13 +79,13 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { - toFinalize.removeAll(node.getInput().expand()); + toFinalize.removeAll(node.getInputs()); AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node); stepNames.put(appliedTransform, genStepName()); - if (node.getInput().expand().isEmpty()) { + if (node.getInputs().isEmpty()) { rootTransforms.add(appliedTransform); } else { - for (PValue value : node.getInput().expand()) { + for (PValue value : node.getInputs()) { primitiveConsumers.put(value, appliedTransform); } } @@ -111,8 +111,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) { @SuppressWarnings({"rawtypes", "unchecked"}) - AppliedPTransform<?, ?, ?> application = AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform()); + AppliedPTransform<?, ?, ?> application = node.toAppliedPTransform(); return application; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 5dc24c2..4161f9e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -74,7 +74,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { if (node.isRootNode()) { finalized = true; } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) { - keyedValues.addAll(node.getOutput().expand()); + keyedValues.addAll(node.getOutputs()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java index 805c41c..209be69 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.flink.translation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -113,8 +112,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; // create the applied PTransform on the batchContext - batchContext.setCurrentTransform(AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); + batchContext.setCurrentTransform(node.toAppliedPTransform()); typedTranslator.translateNode(typedTransform, batchContext); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java index a07dc3d..23f4d34 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.flink.translation; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -109,8 +108,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; // create the applied PTransform on the streamingContext - streamingContext.setCurrentTransform(AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); + streamingContext.setCurrentTransform(node.toAppliedPTransform()); typedTranslator.translateNode(typedTransform, streamingContext); } @@ -125,8 +123,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { @SuppressWarnings("unchecked") StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; - streamingContext.setCurrentTransform(AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); + streamingContext.setCurrentTransform(node.toAppliedPTransform()); return typedTranslator.canTranslate(typedTransform, streamingContext); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index f43e176..8783056 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -531,8 +531,7 @@ public class DataflowPipelineTranslator { "no translator registered for " + transform); } LOG.debug("Translating {}", transform); - currentTransform = AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform); + currentTransform = node.toAppliedPTransform(); translator.translate(transform, this); currentTransform = null; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index d51ee7d..3d98b87 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark; +import com.google.common.collect.Iterables; import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutorService; @@ -42,7 +43,6 @@ import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; @@ -278,8 +278,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { } private boolean shouldDefer(TransformHierarchy.Node node) { - PInput input = node.getInput(); // if the input is not a PCollection, or it is but with non merging windows, don't defer. + if (node.getInputs().size() != 1) { + return false; + } + PValue input = Iterables.getOnlyElement(node.getInputs()); if (!(input instanceof PCollection) || ((PCollection) input).getWindowingStrategy().getWindowFn().isNonMerging()) { return false; @@ -319,8 +322,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { @SuppressWarnings("unchecked") TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass); LOG.info("Evaluating {}", transform); - AppliedPTransform<PInput, POutput, TransformT> appliedTransform = - AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform); + AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(); ctxt.setCurrentTransform(appliedTransform); evaluator.evaluate(transform, ctxt); ctxt.setCurrentTransform(null); @@ -337,12 +339,11 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { // usually, the input determines if the PCollection to apply the next transformation to // is BOUNDED or UNBOUNDED, meaning RDD/DStream. Collection<? extends PValue> pValues; - PInput pInput = node.getInput(); - if (pInput instanceof PBegin) { + if (node.getInputs().isEmpty()) { // in case of a PBegin, it's the output. - pValues = node.getOutput().expand(); + pValues = node.getOutputs(); } else { - pValues = pInput.expand(); + pValues = node.getInputs(); } PCollection.IsBounded isNodeBounded = isBoundedCollection(pValues); // translate accordingly. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index e9829cc..33d5231 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,8 +84,8 @@ public class TransformHierarchy { */ public void finishSpecifyingInput() { // Inputs must be completely specified before they are consumed by a transform. - current.getInput().finishSpecifying(); - for (PValue inputValue : current.getInput().expand()) { + for (PValue inputValue : current.getInputs()) { + inputValue.finishSpecifying(); checkState(producers.get(inputValue) != null, "Producer unknown for input %s", inputValue); inputValue.finishSpecifying(); } @@ -101,6 +102,7 @@ public class TransformHierarchy { * nodes. */ public void setOutput(POutput output) { + output.finishSpecifyingOutput(); for (PValue value : output.expand()) { if (!producers.containsKey(value)) { producers.put(value, current); @@ -253,11 +255,9 @@ public class TransformHierarchy { return fullName; } - /** - * Returns the transform input, in unexpanded form. - */ - public PInput getInput() { - return input; + /** Returns the transform input, in unexpanded form. */ + public Collection<? extends PValue> getInputs() { + return input == null ? Collections.<PValue>emptyList() : input.expand(); } /** @@ -296,13 +296,15 @@ public class TransformHierarchy { } /** Returns the transform output, in unexpanded form. */ - public POutput getOutput() { - return output; + public Collection<? extends PValue> getOutputs() { + return output == null ? Collections.<PValue>emptyList() : output.expand(); } - AppliedPTransform<?, ?, ?> toAppliedPTransform() { - return AppliedPTransform.of( - getFullName(), getInput(), getOutput(), (PTransform) getTransform()); + /** + * Returns the {@link AppliedPTransform} representing this {@link Node}. + */ + public AppliedPTransform<?, ?, ?> toAppliedPTransform() { + return AppliedPTransform.of(getFullName(), input, output, (PTransform) getTransform()); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index f4488f4..ea43188 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.runners; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import java.util.HashSet; @@ -38,8 +37,6 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.hamcrest.Matchers; import org.junit.Before; @@ -172,24 +169,24 @@ public class TransformHierarchyTest { TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", begin, create); assertThat(hierarchy.getCurrent(), equalTo(compositeNode)); - assertThat(compositeNode.getInput(), Matchers.<PInput>equalTo(begin)); + assertThat(compositeNode.getInputs(), Matchers.emptyIterable()); assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create)); // Not yet set - assertThat(compositeNode.getOutput(), nullValue()); + assertThat(compositeNode.getOutputs(), Matchers.emptyIterable()); assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true)); TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", begin, read); assertThat(hierarchy.getCurrent(), equalTo(primitiveNode)); hierarchy.setOutput(created); hierarchy.popNode(); - assertThat(primitiveNode.getOutput(), Matchers.<POutput>equalTo(created)); - assertThat(primitiveNode.getInput(), Matchers.<PInput>equalTo(begin)); + assertThat(primitiveNode.getOutputs(), Matchers.<PValue>containsInAnyOrder(created)); + assertThat(primitiveNode.getInputs(), Matchers.<PValue>emptyIterable()); assertThat(primitiveNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(read)); assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode)); hierarchy.setOutput(created); // The composite is listed as outputting a PValue created by the contained primitive - assertThat(compositeNode.getOutput(), Matchers.<POutput>equalTo(created)); + assertThat(compositeNode.getOutputs(), Matchers.<PValue>containsInAnyOrder(created)); // The producer of that PValue is still the primitive in which it is first output assertThat(hierarchy.getProducer(created), equalTo(primitiveNode)); hierarchy.popNode();