Repository: beam Updated Branches: refs/heads/master 41b24090f -> 2ca3bf669
Clean up TransformHierarchy#replace Remove Existing Outputs from Producer Map in replace. This permits replacements to produce the same PValue as the transform they are replacing, for example in CreatePCollectionView. In replace(), add the replacement input to Unexpanded Inputs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/67d02b9d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/67d02b9d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/67d02b9d Branch: refs/heads/master Commit: 67d02b9de75e607b4fef562746387b4696b22529 Parents: 41b2409 Author: Thomas Groh <tg...@google.com> Authored: Tue Feb 14 14:17:12 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Thu Feb 16 18:57:57 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/runners/TransformHierarchy.java | 28 +++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/67d02b9d/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index d626543..972cb5b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -103,8 +103,29 @@ public class TransformHierarchy { "Replacing a node when the graph has an unexpanded input. This is an SDK bug."); Node replacement = new Node(existing.getEnclosingNode(), transform, existing.getFullName(), input); + for (TaggedPValue output : existing.getOutputs()) { + Node producer = producers.get(output.getValue()); + boolean producedInExisting = false; + do { + if (producer.equals(existing)) { + producedInExisting = true; + } else { + producer = producer.getEnclosingNode(); + } + } while (!producedInExisting && !producer.isRootNode()); + if (producedInExisting) { + producers.remove(output.getValue()); + LOG.debug("Removed producer for value {} as it is part of a replaced composite {}", + output.getValue(), + existing.getFullName()); + } else { + LOG.debug( + "Value {} not produced in existing node {}", output.getValue(), existing.getFullName()); + } + } existing.getEnclosingNode().replaceChild(existing, replacement); unexpandedInputs.remove(existing); + unexpandedInputs.put(replacement, input); current = replacement; return replacement; } @@ -290,7 +311,7 @@ public class TransformHierarchy { "Tried to replace a node %s that doesn't exist as a component of node %s", existing.getFullName(), getFullName()); - LOG.info( + LOG.debug( "Replaced original node {} with replacement {} at index {}", existing, replacement, @@ -399,9 +420,10 @@ public class TransformHierarchy { // This Node produced the replacement PCollection. The structure of this if statement // requires the replacement transform to produce only new outputs; otherwise the // producers map will not be appropriately updated. TODO: investigate alternatives - producers.put(mapping.getOriginal().getValue(), this); - producers.remove(mapping.getReplacement().getValue()); producerInput.remove(mapping.getReplacement().getValue()); + // original and replacement might be identical + producers.remove(mapping.getReplacement().getValue()); + producers.put(mapping.getOriginal().getValue(), this); } LOG.debug( "Replacing output {} with original {}",