Repository: incubator-beam Updated Branches: refs/heads/master 0c875ba70 -> 48130f718
Move TransformHierarchy Maintenance into it This reduces the complexity of Pipeline.applyInternal by keeping the responsiblities to passing a node into the Transform Hierarchy, enforcing name uniqueness, and causing the runner to expand the PTransform. This logic is moved to the appropriate application sites. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab1f1ad0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab1f1ad0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab1f1ad0 Branch: refs/heads/master Commit: ab1f1ad012bc559cdb099319a516e4437eed2825 Parents: 0c875ba Author: Thomas Groh <tg...@google.com> Authored: Tue Nov 29 14:29:47 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Thu Dec 1 12:55:25 2016 -0800 ---------------------------------------------------------------------- .../direct/KeyedPValueTrackingVisitor.java | 2 +- .../DataflowPipelineTranslatorTest.java | 2 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++--------- .../beam/sdk/runners/TransformHierarchy.java | 126 ++++++++----- .../beam/sdk/runners/TransformTreeNode.java | 165 +++++++++-------- .../sdk/runners/TransformHierarchyTest.java | 180 ++++++++++++++----- .../beam/sdk/runners/TransformTreeTest.java | 4 +- 7 files changed, 340 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 7c4376a..47b0857 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -74,7 +74,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { if (node.isRootNode()) { finalized = true; } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) { - keyedValues.addAll(node.getExpandedOutputs()); + keyedValues.addAll(node.getOutput().expand()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index c925454..95c7132 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -669,7 +669,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { PCollection<Integer> input = p.begin() .apply(Create.of(1, 2, 3)); - thrown.expect(IllegalStateException.class); + thrown.expect(IllegalArgumentException.class); input.apply(new PartiallyBoundOutputCreator()); Assert.fail("Failure expected from use of partially bound output"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 9edf496..c8a4439 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk; +import static com.google.common.base.Preconditions.checkState; + import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.sdk.coders.CoderRegistry; @@ -31,7 +32,6 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.UserCodeException; @@ -282,14 +282,12 @@ public class Pipeline { * <p>Typically invoked by {@link PipelineRunner} subclasses. */ public void traverseTopologically(PipelineVisitor visitor) { - Set<PValue> visitedValues = new HashSet<>(); - // Visit all the transforms, which should implicitly visit all the values. - transforms.visit(visitor, visitedValues); - if (!visitedValues.containsAll(values)) { - throw new RuntimeException( - "internal error: should have visited all the values " - + "after visiting all the transforms"); - } + Set<PValue> visitedValues = + // Visit all the transforms, which should implicitly visit all the values. + transforms.visit(visitor); + checkState( + visitedValues.containsAll(values), + "internal error: should have visited all the values after visiting all the transforms"); } /** @@ -351,53 +349,43 @@ public class Pipeline { * * @see Pipeline#apply */ - private <InputT extends PInput, OutputT extends POutput> - OutputT applyInternal(String name, InputT input, - PTransform<? super InputT, OutputT> transform) { - input.finishSpecifying(); + private <InputT extends PInput, OutputT extends POutput> OutputT applyInternal( + String name, InputT input, PTransform<? super InputT, OutputT> transform) { + String namePrefix = transforms.getCurrent().getFullName(); + String uniqueName = uniquifyInternal(namePrefix, name); - TransformTreeNode parent = transforms.getCurrent(); - String namePrefix = parent.getFullName(); - String fullName = uniquifyInternal(namePrefix, name); - - boolean nameIsUnique = fullName.equals(buildName(namePrefix, name)); + boolean nameIsUnique = uniqueName.equals(buildName(namePrefix, name)); if (!nameIsUnique) { switch (getOptions().getStableUniqueNames()) { case OFF: break; case WARNING: - LOG.warn("Transform {} does not have a stable unique name. " - + "This will prevent updating of pipelines.", fullName); + LOG.warn( + "Transform {} does not have a stable unique name. " + + "This will prevent updating of pipelines.", + uniqueName); break; case ERROR: throw new IllegalStateException( - "Transform " + fullName + " does not have a stable unique name. " - + "This will prevent updating of pipelines."); + "Transform " + + uniqueName + + " does not have a stable unique name. " + + "This will prevent updating of pipelines."); default: throw new IllegalArgumentException( "Unrecognized value for stable unique names: " + getOptions().getStableUniqueNames()); } } - TransformTreeNode child = - new TransformTreeNode(parent, transform, fullName, input); - parent.addComposite(child); - - transforms.addInput(child, input); - LOG.debug("Adding {} to {}", transform, this); + transforms.pushNode(uniqueName, input, transform); try { - transforms.pushNode(child); + transforms.finishSpecifyingInput(); transform.validate(input); OutputT output = runner.apply(transform, input); - transforms.setOutput(child, output); + transforms.setOutput(output); - AppliedPTransform<?, ?, ?> applied = AppliedPTransform.of( - child.getFullName(), input, output, transform); - // recordAsOutput is a NOOP if already called; - output.recordAsOutput(applied); - verifyOutputState(output, child); return output; } finally { transforms.popNode(); @@ -405,63 +393,6 @@ public class Pipeline { } /** - * Returns all producing transforms for the {@link PValue PValues} contained - * in {@code output}. - */ - private List<AppliedPTransform<?, ?, ?>> getProducingTransforms(POutput output) { - List<AppliedPTransform<?, ?, ?>> producingTransforms = new ArrayList<>(); - for (PValue value : output.expand()) { - AppliedPTransform<?, ?, ?> transform = value.getProducingTransformInternal(); - if (transform != null) { - producingTransforms.add(transform); - } - } - return producingTransforms; - } - - /** - * Verifies that the output of a {@link PTransform} is correctly configured in its - * {@link TransformTreeNode} in the {@link Pipeline} graph. - * - * <p>A non-composite {@link PTransform} must have all - * of its outputs registered as produced by that {@link PTransform}. - * - * <p>A composite {@link PTransform} must have all of its outputs - * registered as produced by the contained primitive {@link PTransform PTransforms}. - * They have each had the above check performed already, when - * they were applied, so the only possible failure state is - * that the composite {@link PTransform} has returned a primitive output. - */ - private void verifyOutputState(POutput output, TransformTreeNode node) { - if (!node.isCompositeNode()) { - PTransform<?, ?> thisTransform = node.getTransform(); - List<AppliedPTransform<?, ?, ?>> producingTransforms = getProducingTransforms(output); - for (AppliedPTransform<?, ?, ?> producingTransform : producingTransforms) { - // Using != because object identity indicates that the transforms - // are the same node in the pipeline - if (thisTransform != producingTransform.getTransform()) { - throw new IllegalArgumentException("Output of non-composite transform " - + thisTransform + " is registered as being produced by" - + " a different transform: " + producingTransform); - } - } - } else { - PTransform<?, ?> thisTransform = node.getTransform(); - List<AppliedPTransform<?, ?, ?>> producingTransforms = getProducingTransforms(output); - for (AppliedPTransform<?, ?, ?> producingTransform : producingTransforms) { - // Using == because object identity indicates that the transforms - // are the same node in the pipeline - if (thisTransform == producingTransform.getTransform()) { - throw new IllegalStateException("Output of composite transform " - + thisTransform + " is registered as being produced by it," - + " but the output of every composite transform should be" - + " produced by a primitive transform contained therein."); - } - } - } - } - - /** * Returns the configured {@link PipelineRunner}. */ public PipelineRunner<?> getRunner() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index 0a4bb08..d3fd497 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -17,14 +17,17 @@ */ package org.apache.beam.sdk.runners; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import java.util.Deque; +import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; @@ -34,70 +37,109 @@ import org.apache.beam.sdk.values.PValue; * associated {@link PValue}s. */ public class TransformHierarchy { - private final Deque<TransformTreeNode> transformStack = new LinkedList<>(); - private final Map<PInput, TransformTreeNode> producingTransformNode = new HashMap<>(); + private final TransformTreeNode root; + private final Map<POutput, TransformTreeNode> producers; + // Maintain a stack based on the enclosing nodes + private TransformTreeNode current; - /** - * Create a {@code TransformHierarchy} containing a root node. - */ public TransformHierarchy() { - // First element in the stack is the root node, holding all child nodes. - transformStack.add(new TransformTreeNode(null, null, "", null)); + root = TransformTreeNode.root(this); + current = root; + producers = new HashMap<>(); } /** - * Returns the last TransformTreeNode on the stack. + * Adds the named {@link PTransform} consuming the provided {@link PInput} as a node in this + * {@link TransformHierarchy} as a child of the current node, and sets it to be the current node. + * + * <p>This call should be finished by expanding and recursively calling {@link #pushNode(String, + * PInput, PTransform)}, calling {@link #finishSpecifyingInput()}, setting the output with {@link + * #setOutput(POutput)}, and ending with a call to {@link #popNode()}. + * + * @return the added node */ - public TransformTreeNode getCurrent() { - return transformStack.peek(); + public TransformTreeNode pushNode(String name, PInput input, PTransform<?, ?> transform) { + checkNotNull( + transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName()); + checkNotNull( + name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName()); + checkNotNull( + input, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName()); + current = TransformTreeNode.subtransform(current, transform, name, input); + return current; } /** - * Add a TransformTreeNode to the stack. + * Finish specifying all of the input {@link PValue PValues} of the current {@link + * TransformTreeNode}. Ensures that all of the inputs to the current node have been fully + * specified, and have been produced by a node in this graph. */ - public void pushNode(TransformTreeNode current) { - transformStack.push(current); + public void finishSpecifyingInput() { + // Inputs must be completely specified before they are consumed by a transform. + current.getInput().finishSpecifying(); + for (PValue inputValue : current.getInput().expand()) { + checkState(producers.get(inputValue) != null, "Producer unknown for input %s", inputValue); + inputValue.finishSpecifying(); + } } /** - * Removes the last TransformTreeNode from the stack. + * Set the output of the current {@link TransformTreeNode}. If the output is new (setOutput has + * not previously been called with it as the parameter), the current node is set as the producer + * of that {@link POutput}. + * + * <p>Also validates the output - specifically, a Primitive {@link PTransform} produces all of + * its outputs, and a Composite {@link PTransform} produces none of its outputs. Verifies that the + * expanded output does not contain {@link PValue PValues} produced by both this node and other + * nodes. */ - public void popNode() { - transformStack.pop(); - checkState(!transformStack.isEmpty()); + public void setOutput(POutput output) { + for (PValue value : output.expand()) { + if (!producers.containsKey(value)) { + producers.put(value, current); + } + } + current.setOutput(output); + // TODO: Replace with a "generateDefaultNames" method. + output.recordAsOutput(current.toAppliedPTransform()); } /** - * Adds an input to the given node. - * - * <p>This forces the producing node to be finished. + * Pops the current node off the top of the stack, finishing it. Outputs of the node are finished + * once they are consumed as input. */ - public void addInput(TransformTreeNode node, PInput input) { - for (PValue i : input.expand()) { - TransformTreeNode producer = producingTransformNode.get(i); - checkState(producer != null, "Producer unknown for input: %s", i); + public void popNode() { + current.finishSpecifying(); + current = current.getEnclosingNode(); + checkState(current != null, "Can't pop the root node of a TransformHierarchy"); + } - producer.finishSpecifying(); - node.addInputProducer(i, producer); - } + TransformTreeNode getProducer(PValue produced) { + return producers.get(produced); } /** - * Sets the output of a transform node. + * Returns all producing transforms for the {@link PValue PValues} contained + * in {@code output}. */ - public void setOutput(TransformTreeNode producer, POutput output) { - producer.setOutput(output); - - for (PValue o : output.expand()) { - producingTransformNode.put(o, producer); + List<TransformTreeNode> getProducingTransforms(POutput output) { + List<TransformTreeNode> producingTransforms = new ArrayList<>(); + for (PValue value : output.expand()) { + TransformTreeNode producer = getProducer(value); + if (producer != null) { + producingTransforms.add(producer); + } } + return producingTransforms; } - /** - * Visits all nodes in the transform hierarchy, in transitive order. - */ - public void visit(Pipeline.PipelineVisitor visitor, - Set<PValue> visitedNodes) { - transformStack.peekFirst().visit(visitor, visitedNodes); + public Set<PValue> visit(PipelineVisitor visitor) { + Set<PValue> visitedValues = new HashSet<>(); + root.visit(visitor, visitedValues); + return visitedValues; + } + + public TransformTreeNode getCurrent() { + return current; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java index d16b828..ea94bd9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java @@ -17,18 +17,19 @@ */ package org.apache.beam.sdk.runners; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; @@ -39,6 +40,7 @@ import org.apache.beam.sdk.values.PValue; * for initialization and ordered visitation. */ public class TransformTreeNode { + private final TransformHierarchy hierarchy; private final TransformTreeNode enclosingNode; // The PTransform for this node, which may be a composite PTransform. @@ -51,10 +53,6 @@ public class TransformTreeNode { // Nodes for sub-transforms of a composite transform. private final Collection<TransformTreeNode> parts = new ArrayList<>(); - // Inputs to the transform, in expanded form and mapped to the producer - // of the input. - private final Map<PValue, TransformTreeNode> inputs = new HashMap<>(); - // Input to the transform, in unexpanded form. private final PInput input; @@ -62,28 +60,57 @@ public class TransformTreeNode { // Output of the transform, in unexpanded form. private POutput output; - private boolean finishedSpecifying = false; + @VisibleForTesting + boolean finishedSpecifying = false; + + /** + * Create a root {@link TransformTreeNode}. This transform is the root of the provided {@link + * TransformHierarchy} - it has no enclosing node, no {@link PTransform}, no {@link PInput input}, + * no {@link POutput output}, and an empty name. It contains all {@link PTransform transforms} + * within a {@link Pipeline} as component transforms. + */ + public static TransformTreeNode root(TransformHierarchy hierarchy) { + return new TransformTreeNode(hierarchy, null, null, "", null); + } + + /** + * Create a subtransform of the provided {@link TransformTreeNode node}. The enclosing node is a + * composite that contains this transform. + * + * <p>The returned node is a component node of the enclosing node. + */ + public static TransformTreeNode subtransform( + TransformTreeNode enclosing, PTransform<?, ?> transform, String fullName, PInput input) { + checkNotNull(enclosing); + checkNotNull(transform); + checkNotNull(fullName); + checkNotNull(input); + TransformTreeNode node = + new TransformTreeNode(enclosing.hierarchy, enclosing, transform, fullName, input); + enclosing.addComposite(node); + return node; + } /** * Creates a new TransformTreeNode with the given parent and transform. * - * <p>EnclosingNode and transform may both be null for - * a root-level node, which holds all other nodes. + * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other + * nodes. * * @param enclosingNode the composite node containing this node * @param transform the PTransform tracked by this node * @param fullName the fully qualified name of the transform * @param input the unexpanded input to the transform */ - public TransformTreeNode(@Nullable TransformTreeNode enclosingNode, - @Nullable PTransform<?, ?> transform, - String fullName, - @Nullable PInput input) { + private TransformTreeNode( + TransformHierarchy hierarchy, + @Nullable TransformTreeNode enclosingNode, + @Nullable PTransform<?, ?> transform, + String fullName, + @Nullable PInput input) { + this.hierarchy = hierarchy; this.enclosingNode = enclosingNode; this.transform = transform; - checkArgument((enclosingNode == null && transform == null) - || (enclosingNode != null && transform != null), - "EnclosingNode and transform must both be specified, or both be null"); this.fullName = fullName; this.input = input; } @@ -113,21 +140,23 @@ public class TransformTreeNode { } /** - * Returns true if this node represents a composite transform that does not perform - * processing of its own, but merely encapsulates a sub-pipeline (which may be empty). + * Returns true if this node represents a composite transform that does not perform processing of + * its own, but merely encapsulates a sub-pipeline (which may be empty). * - * <p>Note that a node may be composite with no sub-transforms if it returns its input directly + * <p>Note that a node may be composite with no sub-transforms if it returns its input directly * extracts a component of a tuple, or other operations that occur at pipeline assembly time. */ public boolean isCompositeNode() { - return !parts.isEmpty() || returnsOthersOutput() || isRootNode(); + return !parts.isEmpty() || isRootNode() || returnsOthersOutput(); } private boolean returnsOthersOutput() { PTransform<?, ?> transform = getTransform(); - for (PValue output : getExpandedOutputs()) { - if (!output.getProducingTransformInternal().getTransform().equals(transform)) { - return true; + if (output != null) { + for (PValue outputValue : output.expand()) { + if (!hierarchy.getProducer(outputValue).getTransform().equals(transform)) { + return true; + } } } return false; @@ -142,14 +171,6 @@ public class TransformTreeNode { } /** - * Adds an input to the transform node. - */ - public void addInputProducer(PValue expandedInput, TransformTreeNode producer) { - checkState(!finishedSpecifying); - inputs.put(expandedInput, producer); - } - - /** * Returns the transform input, in unexpanded form. */ public PInput getInput() { @@ -157,20 +178,37 @@ public class TransformTreeNode { } /** - * Returns a mapping of inputs to the producing nodes for all inputs to - * the transform. - */ - public Map<PValue, TransformTreeNode> getInputs() { - return Collections.unmodifiableMap(inputs); - } - - /** * Adds an output to the transform node. */ public void setOutput(POutput output) { checkState(!finishedSpecifying); - checkState(this.output == null); + checkState(this.output == null, "Tried to specify more than one output for %s", getFullName()); + checkNotNull(output, "Tried to set the output of %s to null", getFullName()); this.output = output; + + // Validate that a primitive transform produces only primitive output, and a composite transform + // does not produce primitive output. + Set<TransformTreeNode> outputProducers = new HashSet<>(); + for (PValue outputValue : output.expand()) { + outputProducers.add(hierarchy.getProducer(outputValue)); + } + if (outputProducers.contains(this) && outputProducers.size() != 1) { + Set<String> otherProducerNames = new HashSet<>(); + for (TransformTreeNode outputProducer : outputProducers) { + if (outputProducer != this) { + otherProducerNames.add(outputProducer.getFullName()); + } + } + throw new IllegalArgumentException( + String.format( + "Output of transform [%s] contains a %s produced by it as well as other Transforms. " + + "A primitive transform must produce all of its outputs, and outputs of a " + + "composite transform must be produced by a component transform or be part of" + + "the input." + + "%n Other Outputs: %s" + + "%n Other Producers: %s", + getFullName(), POutput.class.getSimpleName(), output.expand(), otherProducerNames)); + } } /** @@ -180,17 +218,10 @@ public class TransformTreeNode { return output; } - /** - * Returns the transform outputs, in expanded form. - */ - public Collection<? extends PValue> getExpandedOutputs() { - if (output != null) { - return output.expand(); - } else { - return Collections.emptyList(); - } + AppliedPTransform<?, ?, ?> toAppliedPTransform() { + return AppliedPTransform.of( + getFullName(), getInput(), getOutput(), (PTransform) getTransform()); } - /** * Visit the transform node. * @@ -204,10 +235,12 @@ public class TransformTreeNode { finishSpecifying(); } - // Visit inputs. - for (Map.Entry<PValue, TransformTreeNode> entry : inputs.entrySet()) { - if (visitedValues.add(entry.getKey())) { - visitor.visitValue(entry.getKey(), entry.getValue()); + if (!isRootNode()) { + // Visit inputs. + for (PValue inputValue : input.expand()) { + if (visitedValues.add(inputValue)) { + visitor.visitValue(inputValue, hierarchy.getProducer(inputValue)); + } } } @@ -224,10 +257,12 @@ public class TransformTreeNode { visitor.visitPrimitiveTransform(this); } - // Visit outputs. - for (PValue pValue : getExpandedOutputs()) { - if (visitedValues.add(pValue)) { - visitor.visitValue(pValue, this); + if (!isRootNode()) { + // Visit outputs. + for (PValue pValue : output.expand()) { + if (visitedValues.add(pValue)) { + visitor.visitValue(pValue, this); + } } } } @@ -243,15 +278,5 @@ public class TransformTreeNode { return; } finishedSpecifying = true; - - for (TransformTreeNode input : inputs.values()) { - if (input != null) { - input.finishSpecifying(); - } - } - - if (output != null) { - output.finishSpecifyingOutput(); - } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index c28f23e..3bf6d64 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.runners; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import java.util.HashSet; @@ -30,9 +31,15 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.hamcrest.Matchers; import org.junit.Before; @@ -63,7 +70,7 @@ public class TransformHierarchyTest { } @Test - public void popWithoutPushThrows() { + public void pushWithoutPushFails() { thrown.expect(IllegalStateException.class); hierarchy.popNode(); } @@ -71,72 +78,153 @@ public class TransformHierarchyTest { @Test public void pushThenPopSucceeds() { TransformTreeNode root = hierarchy.getCurrent(); - TransformTreeNode node = - new TransformTreeNode(hierarchy.getCurrent(), Create.of(1), "Create", PBegin.in(pipeline)); - hierarchy.pushNode(node); + TransformTreeNode node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1)); assertThat(hierarchy.getCurrent(), equalTo(node)); hierarchy.popNode(); + assertThat(node.finishedSpecifying, is(true)); assertThat(hierarchy.getCurrent(), equalTo(root)); } @Test + public void emptyCompositeSucceeds() { + PCollection<Long> created = + PCollection.createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + TransformTreeNode node = hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1)); + hierarchy.setOutput(created); + hierarchy.popNode(); + PCollectionList<Long> pcList = PCollectionList.of(created); + + TransformTreeNode emptyTransform = + hierarchy.pushNode( + "Extract", + pcList, + new PTransform<PCollectionList<Long>, PCollection<Long>>() { + @Override + public PCollection<Long> apply(PCollectionList<Long> input) { + return input.get(0); + } + }); + hierarchy.setOutput(created); + hierarchy.popNode(); + assertThat(hierarchy.getProducer(created), equalTo(node)); + assertThat( + "A Transform that produces non-primtive output should be composite", + emptyTransform.isCompositeNode(), + is(true)); + } + + @Test + public void producingOwnAndOthersOutputsFails() { + PCollection<Long> created = + PCollection.createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + hierarchy.pushNode("Create", PBegin.in(pipeline), Create.of(1)); + hierarchy.setOutput(created); + hierarchy.popNode(); + PCollectionList<Long> pcList = PCollectionList.of(created); + + final PCollectionList<Long> appended = + pcList.and( + PCollection.<Long>createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)); + hierarchy.pushNode( + "AddPc", + pcList, + new PTransform<PCollectionList<Long>, PCollectionList<Long>>() { + @Override + public PCollectionList<Long> apply(PCollectionList<Long> input) { + return appended; + } + }); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("produced by it as well as other Transforms"); + thrown.expectMessage("primitive transform must produce all of its outputs"); + thrown.expectMessage("composite transform must be produced by a component transform"); + thrown.expectMessage("AddPc"); + thrown.expectMessage("Create"); + thrown.expectMessage(appended.expand().toString()); + hierarchy.setOutput(appended); + } + + @Test public void visitVisitsAllPushed() { TransformTreeNode root = hierarchy.getCurrent(); - Create.Values<Integer> create = Create.of(1); - PCollection<Integer> created = pipeline.apply(create); PBegin begin = PBegin.in(pipeline); - TransformTreeNode compositeNode = - new TransformTreeNode(root, create, "Create", begin); - root.addComposite(compositeNode); - TransformTreeNode primitiveNode = - new TransformTreeNode( - compositeNode, Read.from(CountingSource.upTo(1L)), "Create/Read", begin); - compositeNode.addComposite(primitiveNode); - - TransformTreeNode otherPrimitive = - new TransformTreeNode( - root, MapElements.via(new SimpleFunction<Integer, Integer>() { - @Override - public Integer apply(Integer input) { - return input; - } - }), "ParDo", created); - root.addComposite(otherPrimitive); - otherPrimitive.addInputProducer(created, primitiveNode); + Create.Values<Long> create = Create.of(1L); + Read.Bounded<Long> read = Read.from(CountingSource.upTo(1L)); - hierarchy.pushNode(compositeNode); - hierarchy.pushNode(primitiveNode); + PCollection<Long> created = + PCollection.createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + + MapElements<Long, Long> map = MapElements.via(new SimpleFunction<Long, Long>() { + @Override + public Long apply(Long input) { + return input; + } + }); + + PCollection<Long> mapped = + PCollection.createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + + TransformTreeNode compositeNode = hierarchy.pushNode("Create", begin, create); + assertThat(hierarchy.getCurrent(), equalTo(compositeNode)); + assertThat(compositeNode.getInput(), Matchers.<PInput>equalTo(begin)); + assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(create)); + // Not yet set + assertThat(compositeNode.getOutput(), nullValue()); + assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true)); + + TransformTreeNode primitiveNode = hierarchy.pushNode("Create/Read", begin, read); + assertThat(hierarchy.getCurrent(), equalTo(primitiveNode)); + hierarchy.setOutput(created); hierarchy.popNode(); + assertThat(primitiveNode.getOutput(), Matchers.<POutput>equalTo(created)); + assertThat(primitiveNode.getInput(), Matchers.<PInput>equalTo(begin)); + assertThat(primitiveNode.getTransform(), Matchers.<PTransform<?, ?>>equalTo(read)); + assertThat(primitiveNode.getEnclosingNode(), equalTo(compositeNode)); + + hierarchy.setOutput(created); + // The composite is listed as outputting a PValue created by the contained primitive + assertThat(compositeNode.getOutput(), Matchers.<POutput>equalTo(created)); + // The producer of that PValue is still the primitive in which it is first output + assertThat(hierarchy.getProducer(created), equalTo(primitiveNode)); hierarchy.popNode(); - hierarchy.pushNode(otherPrimitive); + + TransformTreeNode otherPrimitive = hierarchy.pushNode("ParDo", created, map); + hierarchy.setOutput(mapped); hierarchy.popNode(); final Set<TransformTreeNode> visitedCompositeNodes = new HashSet<>(); final Set<TransformTreeNode> visitedPrimitiveNodes = new HashSet<>(); final Set<PValue> visitedValuesInVisitor = new HashSet<>(); - Set<PValue> visitedValues = new HashSet<>(); - hierarchy.visit(new PipelineVisitor.Defaults() { - @Override - public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { - visitedCompositeNodes.add(node); - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void visitPrimitiveTransform(TransformTreeNode node) { - visitedPrimitiveNodes.add(node); - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - visitedValuesInVisitor.add(value); - } - }, visitedValues); + Set<PValue> visitedValues = + hierarchy.visit( + new PipelineVisitor.Defaults() { + @Override + public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + visitedCompositeNodes.add(node); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + visitedPrimitiveNodes.add(node); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + visitedValuesInVisitor.add(value); + } + }); assertThat(visitedCompositeNodes, containsInAnyOrder(root, compositeNode)); assertThat(visitedPrimitiveNodes, containsInAnyOrder(primitiveNode, otherPrimitive)); - assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created)); + assertThat(visitedValuesInVisitor, Matchers.<PValue>containsInAnyOrder(created, mapped)); + assertThat(visitedValuesInVisitor, equalTo(visitedValues)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index def3a02..b95fa70 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.io.File; import java.util.Arrays; @@ -169,14 +168,13 @@ public class TransformTreeTest { assertTrue(left.equals(EnumSet.of(TransformsSeen.SAMPLE_ANY))); } - @Test(expected = IllegalStateException.class) + @Test(expected = IllegalArgumentException.class) public void testOutputChecking() throws Exception { Pipeline p = TestPipeline.create(); p.apply(new InvalidCompositeTransform()); p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {}); - fail("traversal should have failed with an IllegalStateException"); } @Test