Repository: incubator-beam
Updated Branches:
  refs/heads/master 2f8ba65fa -> 1c2fa03cc


Give root transforms step names

Fix a bug where steps would only be given step names if they were a
non-root node.

Use the ConsumerTrackingPipelineVisitor in the
InProcessEvaluationContext test to handle runner-expanded transforms


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5888df7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5888df7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5888df7b

Branch: refs/heads/master
Commit: 5888df7b3d6183d389ce3141de321be25256fc2f
Parents: 2f8ba65
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 4 11:10:18 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Tue Apr 5 10:10:43 2016 -0700

----------------------------------------------------------------------
 .../ConsumerTrackingPipelineVisitor.java        |  2 +-
 .../ConsumerTrackingPipelineVisitorTest.java    | 37 +++++++++++++++
 .../InProcessEvaluationContextTest.java         | 50 +++++++-------------
 3 files changed, 56 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
index ec4f08b..48836e9 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
@@ -76,12 +76,12 @@ public class ConsumerTrackingPipelineVisitor implements 
PipelineVisitor {
   public void visitTransform(TransformTreeNode node) {
     toFinalize.removeAll(node.getInput().expand());
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
+    stepNames.put(appliedTransform, genStepName());
     if (node.getInput().expand().isEmpty()) {
       rootTransforms.add(appliedTransform);
     } else {
       for (PValue value : node.getInput().expand()) {
         valueToConsumers.get(value).add(appliedTransform);
-        stepNames.put(appliedTransform, genStepName());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
index bea6fe1..905f58f 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
@@ -177,6 +177,43 @@ public class ConsumerTrackingPipelineVisitorTest 
implements Serializable {
   }
 
   @Test
+  public void getStepNamesContainsAllTransforms() {
+    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+    PCollection<String> transformed =
+        created.apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @Override
+                  public void processElement(DoFn<String, 
String>.ProcessContext c)
+                      throws Exception {
+                    c.output(Integer.toString(c.element().length()));
+                  }
+                }));
+    PDone finished =
+        transformed.apply(
+            new PTransform<PInput, PDone>() {
+              @Override
+              public PDone apply(PInput input) {
+                return PDone.in(input.getPipeline());
+              }
+            });
+
+    p.traverseTopologically(visitor);
+    assertThat(
+        visitor.getStepNames(),
+        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+            created.getProducingTransformInternal(), "s0"));
+    assertThat(
+        visitor.getStepNames(),
+        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+            transformed.getProducingTransformInternal(), "s1"));
+    assertThat(
+        visitor.getStepNames(),
+        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+            finished.getProducingTransformInternal(), "s2"));
+  }
+
+  @Test
   public void traverseMultipleTimesThrows() {
     p.apply(Create.of(1, 2, 3));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index fde2cb4..e1faf1b 100644
--- 
a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ 
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -73,7 +73,6 @@ import org.junit.runners.JUnit4;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -90,7 +89,8 @@ public class InProcessEvaluationContextTest {
   private PCollection<KV<String, Integer>> downstream;
   private PCollectionView<Iterable<Integer>> view;
   private PCollection<Long> unbounded;
-
+  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
+  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
 
   @Before
   public void setup() {
@@ -103,32 +103,20 @@ public class InProcessEvaluationContextTest {
     downstream = created.apply(WithKeys.<String, Integer>of("foo"));
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(CountingInput.unbounded());
-    Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
-        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
-            created.getProducingTransformInternal(), 
unbounded.getProducingTransformInternal());
-    Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new 
HashMap<>();
-    valueToConsumers.put(
-        created,
-        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
-            downstream.getProducingTransformInternal(), 
view.getProducingTransformInternal()));
-    valueToConsumers.put(unbounded, ImmutableList.<AppliedPTransform<?, ?, 
?>>of());
-    valueToConsumers.put(downstream, ImmutableList.<AppliedPTransform<?, ?, 
?>>of());
-    valueToConsumers.put(view, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
-
-    Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-    stepNames.put(created.getProducingTransformInternal(), "s1");
-    stepNames.put(downstream.getProducingTransformInternal(), "s2");
-    stepNames.put(view.getProducingTransformInternal(), "s3");
-    stepNames.put(unbounded.getProducingTransformInternal(), "s4");
-
-    Collection<PCollectionView<?>> views = 
ImmutableList.<PCollectionView<?>>of(view);
-    context = InProcessEvaluationContext.create(
+
+    ConsumerTrackingPipelineVisitor cVis = new 
ConsumerTrackingPipelineVisitor();
+    p.traverseTopologically(cVis);
+    rootTransforms = cVis.getRootTransforms();
+    valueToConsumers = cVis.getValueToConsumers();
+
+    context =
+        InProcessEvaluationContext.create(
             runner.getPipelineOptions(),
             InProcessBundleFactory.create(),
             rootTransforms,
             valueToConsumers,
-            stepNames,
-            views);
+            cVis.getStepNames(),
+            cVis.getViews());
   }
 
   @Test
@@ -495,16 +483,14 @@ public class InProcessEvaluationContextTest {
         null,
         ImmutableList.<TimerData>of(),
         
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
-    context.handleResult(
-        committedBundle,
-        ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
     assertThat(context.isDone(), is(false));
 
-    context.handleResult(
-        committedBundle,
-        ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+    for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) 
{
+      context.handleResult(
+          committedBundle,
+          ImmutableList.<TimerData>of(),
+          StepTransformResult.withoutHold(consumers).build());
+    }
     assertThat(context.isDone(), is(true));
   }
 

Reply via email to