Repository: beam
Updated Branches:
  refs/heads/master 41b24090f -> 2ca3bf669


Clean up TransformHierarchy#replace

Remove Existing Outputs from Producer Map in replace. This permits
replacements to produce the same PValue as the transform they are
replacing, for example in CreatePCollectionView.

In replace(), add the replacement input to Unexpanded Inputs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/67d02b9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/67d02b9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/67d02b9d

Branch: refs/heads/master
Commit: 67d02b9de75e607b4fef562746387b4696b22529
Parents: 41b2409
Author: Thomas Groh <tg...@google.com>
Authored: Tue Feb 14 14:17:12 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Feb 16 18:57:57 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/runners/TransformHierarchy.java    | 28 +++++++++++++++++---
 1 file changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/67d02b9d/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index d626543..972cb5b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -103,8 +103,29 @@ public class TransformHierarchy {
         "Replacing a node when the graph has an unexpanded input. This is an 
SDK bug.");
     Node replacement =
         new Node(existing.getEnclosingNode(), transform, 
existing.getFullName(), input);
+    for (TaggedPValue output : existing.getOutputs()) {
+      Node producer = producers.get(output.getValue());
+      boolean producedInExisting = false;
+      do {
+        if (producer.equals(existing)) {
+          producedInExisting = true;
+        } else {
+          producer = producer.getEnclosingNode();
+        }
+      } while (!producedInExisting && !producer.isRootNode());
+      if (producedInExisting) {
+        producers.remove(output.getValue());
+        LOG.debug("Removed producer for value {} as it is part of a replaced 
composite {}",
+            output.getValue(),
+            existing.getFullName());
+      } else {
+        LOG.debug(
+            "Value {} not produced in existing node {}", output.getValue(), 
existing.getFullName());
+      }
+    }
     existing.getEnclosingNode().replaceChild(existing, replacement);
     unexpandedInputs.remove(existing);
+    unexpandedInputs.put(replacement, input);
     current = replacement;
     return replacement;
   }
@@ -290,7 +311,7 @@ public class TransformHierarchy {
           "Tried to replace a node %s that doesn't exist as a component of 
node %s",
           existing.getFullName(),
           getFullName());
-      LOG.info(
+      LOG.debug(
           "Replaced original node {} with replacement {} at index {}",
           existing,
           replacement,
@@ -399,9 +420,10 @@ public class TransformHierarchy {
             // This Node produced the replacement PCollection. The structure 
of this if statement
             // requires the replacement transform to produce only new outputs; 
otherwise the
             // producers map will not be appropriately updated. TODO: 
investigate alternatives
-            producers.put(mapping.getOriginal().getValue(), this);
-            producers.remove(mapping.getReplacement().getValue());
             producerInput.remove(mapping.getReplacement().getValue());
+            // original and replacement might be identical
+            producers.remove(mapping.getReplacement().getValue());
+            producers.put(mapping.getOriginal().getValue(), this);
           }
           LOG.debug(
               "Replacing output {} with original {}",

Reply via email to