Visit a Transform Hierarchy in Topological Order This reverts commit 6ad6433ec0c02aec8656e9e3b27f6e0f974f8ece.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/261e7df2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/261e7df2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/261e7df2 Branch: refs/heads/gearpump-runner Commit: 261e7df2b860fe82d9f401e2621b020fe2020fea Parents: d2c4093 Author: Thomas Groh <tg...@google.com> Authored: Tue Jun 6 16:15:19 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Jun 9 14:44:42 2017 -0700 ---------------------------------------------------------------------- .../spark/translation/StorageLevelTest.java | 4 +- .../beam/sdk/runners/TransformHierarchy.java | 79 +++++++- .../sdk/runners/TransformHierarchyTest.java | 197 +++++++++++++++++++ 3 files changed, 274 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java index 8f2e681..8bd6dae 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java @@ -58,12 +58,12 @@ public class StorageLevelTest { @Test public void test() throws Exception { - PCollection<String> pCollection = pipeline.apply(Create.of("foo")); + PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo")); // by default, the Spark runner doesn't cache the RDD if it accessed only one time. // So, to "force" the caching of the RDD, we have to call the RDD at least two time. // That's why we are using Count fn on the PCollection. - pCollection.apply(Count.<String>globally()); + pCollection.apply("CountAll", Count.<String>globally()); PCollection<String> output = pCollection.apply(new StorageLevelPTransform()); http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index ee1ce7b..5e048eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -208,7 +208,7 @@ public class TransformHierarchy { public Set<PValue> visit(PipelineVisitor visitor) { finishSpecifying(); Set<PValue> visitedValues = new HashSet<>(); - root.visit(visitor, visitedValues); + root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>()); return visitedValues; } @@ -503,10 +503,60 @@ public class TransformHierarchy { /** * Visit the transform node. * + * <p>The visit proceeds in the following order: + * + * <ul> + * <li>Visit all input {@link PValue PValues} returned by the flattened expansion of {@link + * Node#getInputs()}. + * <li>If the node is a composite: + * <ul> + * <li>Enter the node via {@link PipelineVisitor#enterCompositeTransform(Node)}. + * <li>If the result of {@link PipelineVisitor#enterCompositeTransform(Node)} was {@link + * CompositeBehavior#ENTER_TRANSFORM}, visit each child node of this {@link Node}. + * <li>Leave the node via {@link PipelineVisitor#leaveCompositeTransform(Node)}. + * </ul> + * <li>If the node is a primitive, visit it via {@link + * PipelineVisitor#visitPrimitiveTransform(Node)}. + * <li>Visit each {@link PValue} that was output by this node. + * </ul> + * + * <p>Additionally, the following ordering restrictions are observed: + * + * <ul> + * <li>A {@link Node} will be visited after its enclosing node has been entered and before its + * enclosing node has been left + * <li>A {@link Node} will not be visited if any enclosing {@link Node} has returned {@link + * CompositeBehavior#DO_NOT_ENTER_TRANSFORM} from the call to {@link + * PipelineVisitor#enterCompositeTransform(Node)}. + * <li>A {@link PValue} will only be visited after the {@link Node} that originally produced + * it has been visited. + * </ul> + * * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for * composite transforms), then the output values. */ - private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) { + private void visit( + PipelineVisitor visitor, + Set<PValue> visitedValues, + Set<Node> visitedNodes, + Set<Node> skippedComposites) { + if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) { + // Recursively enter all enclosing nodes, as appropriate. + getEnclosingNode().visit(visitor, visitedValues, visitedNodes, skippedComposites); + } + // These checks occur after visiting the enclosing node to ensure that if this node has been + // visited while visiting the enclosing node the node is not revisited, or, if an enclosing + // Node is skipped, this node is also skipped. + if (!visitedNodes.add(this)) { + LOG.debug("Not revisiting previously visited node {}", this); + return; + } else if (childNodeOf(skippedComposites)) { + // This node is a child of a node that has been passed over via CompositeBehavior, and + // should also be skipped. All child nodes of a skipped composite should always be skipped. + LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this); + return; + } + if (!finishedSpecifying) { finishSpecifying(); } @@ -514,22 +564,31 @@ public class TransformHierarchy { if (!isRootNode()) { // Visit inputs. for (PValue inputValue : inputs.values()) { + Node valueProducer = getProducer(inputValue); + if (!visitedNodes.contains(valueProducer)) { + valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites); + } if (visitedValues.add(inputValue)) { - visitor.visitValue(inputValue, getProducer(inputValue)); + LOG.debug("Visiting input value {}", inputValue); + visitor.visitValue(inputValue, valueProducer); } } } if (isCompositeNode()) { + LOG.debug("Visiting composite node {}", this); PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this); if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) { for (Node child : parts) { - child.visit(visitor, visitedValues); + child.visit(visitor, visitedValues, visitedNodes, skippedComposites); } + } else { + skippedComposites.add(this); } visitor.leaveCompositeTransform(this); } else { + LOG.debug("Visiting primitive node {}", this); visitor.visitPrimitiveTransform(this); } @@ -538,12 +597,24 @@ public class TransformHierarchy { // Visit outputs. for (PValue pValue : outputs.values()) { if (visitedValues.add(pValue)) { + LOG.debug("Visiting output value {}", pValue); visitor.visitValue(pValue, this); } } } } + private boolean childNodeOf(Set<Node> nodes) { + if (isRootNode()) { + return false; + } + Node parent = this.getEnclosingNode(); + while (!parent.isRootNode() && !nodes.contains(parent)) { + parent = parent.getEnclosingNode(); + } + return nodes.contains(parent); + } + /** * Finish specifying a transform. * http://git-wip-us.apache.org/repos/asf/beam/blob/261e7df2/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 1197d1b..93650dd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.runners; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; @@ -32,6 +33,8 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; @@ -492,4 +495,198 @@ public class TransformHierarchyTest implements Serializable { assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode)); assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output)); } + + @Test + public void visitIsTopologicallyOrdered() { + PCollection<String> one = + PCollection.<String>createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED) + .setCoder(StringUtf8Coder.of()); + final PCollection<Integer> two = + PCollection.<Integer>createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) + .setCoder(VarIntCoder.of()); + final PDone done = PDone.in(pipeline); + final TupleTag<String> oneTag = new TupleTag<String>() {}; + final TupleTag<Integer> twoTag = new TupleTag<Integer>() {}; + final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two); + + PTransform<PCollection<String>, PDone> multiConsumer = + new PTransform<PCollection<String>, PDone>() { + @Override + public PDone expand(PCollection<String> input) { + return done; + } + + @Override + public Map<TupleTag<?>, PValue> getAdditionalInputs() { + return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two); + } + }; + hierarchy.pushNode("consumes_both", one, multiConsumer); + hierarchy.setOutput(done); + hierarchy.popNode(); + + final PTransform<PBegin, PCollectionTuple> producer = + new PTransform<PBegin, PCollectionTuple>() { + @Override + public PCollectionTuple expand(PBegin input) { + return oneAndTwo; + } + }; + hierarchy.pushNode( + "encloses_producer", + PBegin.in(pipeline), + new PTransform<PBegin, PCollectionTuple>() { + @Override + public PCollectionTuple expand(PBegin input) { + return input.apply(producer); + } + }); + hierarchy.pushNode( + "creates_one_and_two", + PBegin.in(pipeline), producer); + hierarchy.setOutput(oneAndTwo); + hierarchy.popNode(); + hierarchy.setOutput(oneAndTwo); + hierarchy.popNode(); + + hierarchy.pushNode("second_copy_of_consumes_both", one, multiConsumer); + hierarchy.setOutput(done); + hierarchy.popNode(); + + final Set<Node> visitedNodes = new HashSet<>(); + final Set<Node> exitedNodes = new HashSet<>(); + final Set<PValue> visitedValues = new HashSet<>(); + hierarchy.visit( + new PipelineVisitor.Defaults() { + + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + for (PValue input : node.getInputs().values()) { + assertThat(visitedValues, hasItem(input)); + } + assertThat( + "Nodes should not be visited more than once", visitedNodes, not(hasItem(node))); + if (!node.isRootNode()) { + assertThat( + "Nodes should always be visited after their enclosing nodes", + visitedNodes, + hasItem(node.getEnclosingNode())); + } + visitedNodes.add(node); + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(Node node) { + assertThat(visitedNodes, hasItem(node)); + if (!node.isRootNode()) { + assertThat( + "Nodes should always be left before their enclosing nodes are left", + exitedNodes, + not(hasItem(node.getEnclosingNode()))); + } + assertThat(exitedNodes, not(hasItem(node))); + exitedNodes.add(node); + } + + @Override + public void visitPrimitiveTransform(Node node) { + assertThat(visitedNodes, hasItem(node.getEnclosingNode())); + assertThat(exitedNodes, not(hasItem(node.getEnclosingNode()))); + assertThat( + "Nodes should not be visited more than once", visitedNodes, not(hasItem(node))); + for (PValue input : node.getInputs().values()) { + assertThat(visitedValues, hasItem(input)); + } + visitedNodes.add(node); + } + + @Override + public void visitValue(PValue value, Node producer) { + assertThat(visitedNodes, hasItem(producer)); + assertThat(visitedValues, not(hasItem(value))); + visitedValues.add(value); + } + }); + assertThat("Should have visited all the nodes", visitedNodes.size(), equalTo(5)); + assertThat("Should have left all of the visited composites", exitedNodes.size(), equalTo(2)); + } + + @Test + public void visitDoesNotVisitSkippedNodes() { + PCollection<String> one = + PCollection.<String>createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED) + .setCoder(StringUtf8Coder.of()); + final PCollection<Integer> two = + PCollection.<Integer>createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED) + .setCoder(VarIntCoder.of()); + final PDone done = PDone.in(pipeline); + final TupleTag<String> oneTag = new TupleTag<String>() {}; + final TupleTag<Integer> twoTag = new TupleTag<Integer>() {}; + final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two); + + hierarchy.pushNode( + "consumes_both", + one, + new PTransform<PCollection<String>, PDone>() { + @Override + public PDone expand(PCollection<String> input) { + return done; + } + + @Override + public Map<TupleTag<?>, PValue> getAdditionalInputs() { + return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two); + } + }); + hierarchy.setOutput(done); + hierarchy.popNode(); + + final PTransform<PBegin, PCollectionTuple> producer = + new PTransform<PBegin, PCollectionTuple>() { + @Override + public PCollectionTuple expand(PBegin input) { + return oneAndTwo; + } + }; + final Node enclosing = + hierarchy.pushNode( + "encloses_producer", + PBegin.in(pipeline), + new PTransform<PBegin, PCollectionTuple>() { + @Override + public PCollectionTuple expand(PBegin input) { + return input.apply(producer); + } + }); + Node enclosed = hierarchy.pushNode("creates_one_and_two", PBegin.in(pipeline), producer); + hierarchy.setOutput(oneAndTwo); + hierarchy.popNode(); + hierarchy.setOutput(oneAndTwo); + hierarchy.popNode(); + + final Set<Node> visitedNodes = new HashSet<>(); + hierarchy.visit( + new PipelineVisitor.Defaults() { + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + visitedNodes.add(node); + return node.equals(enclosing) + ? CompositeBehavior.DO_NOT_ENTER_TRANSFORM + : CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void visitPrimitiveTransform(Node node) { + visitedNodes.add(node); + } + }); + + assertThat(visitedNodes, hasItem(enclosing)); + assertThat(visitedNodes, not(hasItem(enclosed))); + } }