Repository: incubator-beam Updated Branches: refs/heads/master 2f8ba65fa -> 1c2fa03cc
Give root transforms step names Fix a bug where steps would only be given step names if they were a non-root node. Use the ConsumerTrackingPipelineVisitor in the InProcessEvaluationContext test to handle runner-expanded transforms Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5888df7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5888df7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5888df7b Branch: refs/heads/master Commit: 5888df7b3d6183d389ce3141de321be25256fc2f Parents: 2f8ba65 Author: Thomas Groh <tg...@google.com> Authored: Mon Apr 4 11:10:18 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Tue Apr 5 10:10:43 2016 -0700 ---------------------------------------------------------------------- .../ConsumerTrackingPipelineVisitor.java | 2 +- .../ConsumerTrackingPipelineVisitorTest.java | 37 +++++++++++++++ .../InProcessEvaluationContextTest.java | 50 +++++++------------- 3 files changed, 56 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java index ec4f08b..48836e9 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java @@ -76,12 +76,12 @@ public class ConsumerTrackingPipelineVisitor implements PipelineVisitor { public void visitTransform(TransformTreeNode node) { toFinalize.removeAll(node.getInput().expand()); AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node); + stepNames.put(appliedTransform, genStepName()); if (node.getInput().expand().isEmpty()) { rootTransforms.add(appliedTransform); } else { for (PValue value : node.getInput().expand()) { valueToConsumers.get(value).add(appliedTransform); - stepNames.put(appliedTransform, genStepName()); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java index bea6fe1..905f58f 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java @@ -177,6 +177,43 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable { } @Test + public void getStepNamesContainsAllTransforms() { + PCollection<String> created = p.apply(Create.of("1", "2", "3")); + PCollection<String> transformed = + created.apply( + ParDo.of( + new DoFn<String, String>() { + @Override + public void processElement(DoFn<String, String>.ProcessContext c) + throws Exception { + c.output(Integer.toString(c.element().length())); + } + })); + PDone finished = + transformed.apply( + new PTransform<PInput, PDone>() { + @Override + public PDone apply(PInput input) { + return PDone.in(input.getPipeline()); + } + }); + + p.traverseTopologically(visitor); + assertThat( + visitor.getStepNames(), + Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry( + created.getProducingTransformInternal(), "s0")); + assertThat( + visitor.getStepNames(), + Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry( + transformed.getProducingTransformInternal(), "s1")); + assertThat( + visitor.getStepNames(), + Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry( + finished.getProducingTransformInternal(), "s2")); + } + + @Test public void traverseMultipleTimesThrows() { p.apply(Create.of(1, 2, 3)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index fde2cb4..e1faf1b 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -73,7 +73,6 @@ import org.junit.runners.JUnit4; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -90,7 +89,8 @@ public class InProcessEvaluationContextTest { private PCollection<KV<String, Integer>> downstream; private PCollectionView<Iterable<Integer>> view; private PCollection<Long> unbounded; - + private Collection<AppliedPTransform<?, ?, ?>> rootTransforms; + private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers; @Before public void setup() { @@ -103,32 +103,20 @@ public class InProcessEvaluationContextTest { downstream = created.apply(WithKeys.<String, Integer>of("foo")); view = created.apply(View.<Integer>asIterable()); unbounded = p.apply(CountingInput.unbounded()); - Collection<AppliedPTransform<?, ?, ?>> rootTransforms = - ImmutableList.<AppliedPTransform<?, ?, ?>>of( - created.getProducingTransformInternal(), unbounded.getProducingTransformInternal()); - Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>(); - valueToConsumers.put( - created, - ImmutableList.<AppliedPTransform<?, ?, ?>>of( - downstream.getProducingTransformInternal(), view.getProducingTransformInternal())); - valueToConsumers.put(unbounded, ImmutableList.<AppliedPTransform<?, ?, ?>>of()); - valueToConsumers.put(downstream, ImmutableList.<AppliedPTransform<?, ?, ?>>of()); - valueToConsumers.put(view, ImmutableList.<AppliedPTransform<?, ?, ?>>of()); - - Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); - stepNames.put(created.getProducingTransformInternal(), "s1"); - stepNames.put(downstream.getProducingTransformInternal(), "s2"); - stepNames.put(view.getProducingTransformInternal(), "s3"); - stepNames.put(unbounded.getProducingTransformInternal(), "s4"); - - Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view); - context = InProcessEvaluationContext.create( + + ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor(); + p.traverseTopologically(cVis); + rootTransforms = cVis.getRootTransforms(); + valueToConsumers = cVis.getValueToConsumers(); + + context = + InProcessEvaluationContext.create( runner.getPipelineOptions(), InProcessBundleFactory.create(), rootTransforms, valueToConsumers, - stepNames, - views); + cVis.getStepNames(), + cVis.getViews()); } @Test @@ -495,16 +483,14 @@ public class InProcessEvaluationContextTest { null, ImmutableList.<TimerData>of(), StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - context.handleResult( - committedBundle, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); assertThat(context.isDone(), is(false)); - context.handleResult( - committedBundle, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) { + context.handleResult( + committedBundle, + ImmutableList.<TimerData>of(), + StepTransformResult.withoutHold(consumers).build()); + } assertThat(context.isDone(), is(true)); }