Repository: beam
Updated Branches:
  refs/heads/master caaa64dc6 -> effca63f9


Add ReplacementOutput

This is used to construct mappings between the original and replacement
PValues in an output, in order to permit the Pipeline to replace all of
the outputs generated by replacement transforms with the originals so
elements flow through the graph appropriately.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61efaa47
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61efaa47
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61efaa47

Branch: refs/heads/master
Commit: 61efaa479daf2faa52ba49f3517a9058c721fcf0
Parents: caaa64d
Author: Thomas Groh <tg...@google.com>
Authored: Tue Feb 7 09:37:29 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Feb 10 10:03:48 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/ReplacementOutputs.java   | 105 ++++++++
 .../runners/core/ReplacementOutputsTest.java    | 254 +++++++++++++++++++
 .../sdk/runners/PTransformOverrideFactory.java  |  15 ++
 3 files changed, 374 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/61efaa47/runners/core-java/src/main/java/org/apache/beam/runners/core/ReplacementOutputs.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReplacementOutputs.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReplacementOutputs.java
new file mode 100644
index 0000000..73c3c5d
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReplacementOutputs.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Utility methods for creating {@link ReplacementOutput} for known styles of 
{@link POutput}.
+ */
+public class ReplacementOutputs {
+  private ReplacementOutputs() {}
+
+  public static Map<PValue, ReplacementOutput> singleton(
+      List<TaggedPValue> original, PValue replacement) {
+    TaggedPValue taggedReplacement = 
Iterables.getOnlyElement(replacement.expand());
+    return ImmutableMap.<PValue, ReplacementOutput>builder()
+        .put(
+            taggedReplacement.getValue(),
+            ReplacementOutput.of(Iterables.getOnlyElement(original), 
taggedReplacement))
+        .build();
+  }
+
+  public static Map<PValue, ReplacementOutput> ordered(
+      List<TaggedPValue> original, POutput replacement) {
+    ImmutableMap.Builder<PValue, ReplacementOutput> result = 
ImmutableMap.builder();
+    List<TaggedPValue> replacements = replacement.expand();
+    checkArgument(
+        original.size() == replacements.size(),
+        "Original and Replacements must be the same size. Original: %s 
Replacement: %s",
+        original.size(),
+        replacements.size());
+    int i = 0;
+    for (TaggedPValue replacementPvalue : replacements) {
+      result.put(
+          replacementPvalue.getValue(), ReplacementOutput.of(original.get(i), 
replacementPvalue));
+      i++;
+    }
+    return result.build();
+  }
+
+  public static Map<PValue, ReplacementOutput> tagged(
+      List<TaggedPValue> original, POutput replacement) {
+    Map<TupleTag<?>, TaggedPValue> originalTags = new HashMap<>();
+    for (TaggedPValue value : original) {
+      TaggedPValue former = originalTags.put(value.getTag(), value);
+      checkArgument(
+          former == null || former.equals(value),
+          "Found two tags in an expanded output which map to different values: 
output: %s "
+              + "Values: %s and %s",
+          original,
+          former,
+          value);
+    }
+    ImmutableMap.Builder<PValue, ReplacementOutput> resultBuilder = 
ImmutableMap.builder();
+    Set<TupleTag<?>> missingTags = new HashSet<>(originalTags.keySet());
+    for (TaggedPValue replacementValue : replacement.expand()) {
+      TaggedPValue mapped = originalTags.get(replacementValue.getTag());
+      checkArgument(
+          mapped != null,
+          "Missing original output for Tag %s and Value %s Between original %s 
and replacement %s",
+          replacementValue.getTag(),
+          replacementValue.getValue(),
+          original,
+          replacement.expand());
+      resultBuilder.put(
+          replacementValue.getValue(), ReplacementOutput.of(mapped, 
replacementValue));
+      missingTags.remove(replacementValue.getTag());
+    }
+    ImmutableMap<PValue, ReplacementOutput> result = resultBuilder.build();
+    checkArgument(
+        missingTags.isEmpty(),
+        "Missing replacement for tags %s. Encountered tags: %s",
+        missingTags,
+        result.keySet());
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/61efaa47/runners/core-java/src/test/java/org/apache/beam/runners/core/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReplacementOutputsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReplacementOutputsTest.java
new file mode 100644
index 0000000..49943d7
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReplacementOutputsTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReplacementOutputs}.
+ */
+@RunWith(JUnit4.class)
+public class ReplacementOutputsTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private TestPipeline p = TestPipeline.create();
+
+  private PCollection<Integer> ints =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+  private PCollection<Integer> moreInts =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+  private PCollection<String> strs =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+  private PCollection<Integer> replacementInts =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+  private PCollection<Integer> moreReplacementInts =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+  private PCollection<String> replacementStrs =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+  @Test
+  public void singletonSucceeds() {
+    Map<PValue, ReplacementOutput> replacements =
+        ReplacementOutputs.singleton(ints.expand(), replacementInts);
+
+    assertThat(replacements, Matchers.<PValue>hasKey(replacementInts));
+
+    ReplacementOutput replacement = replacements.get(replacementInts);
+    TaggedPValue taggedInts = Iterables.getOnlyElement(ints.expand());
+    assertThat(replacement.getOriginal(), equalTo(taggedInts));
+    assertThat(replacement.getReplacement().getValue(), 
Matchers.<PValue>equalTo(replacementInts));
+  }
+
+  @Test
+  public void singletonMultipleOriginalsThrows() {
+    thrown.expect(IllegalArgumentException.class);
+    ReplacementOutputs.singleton(
+        ImmutableList.copyOf(Iterables.concat(ints.expand(), 
moreInts.expand())), replacementInts);
+  }
+
+  @Test
+  public void orderedSucceeds() {
+    List<TaggedPValue> originals = 
PCollectionList.of(ints).and(moreInts).expand();
+    Map<PValue, ReplacementOutput> replacements =
+        ReplacementOutputs.ordered(
+            originals, 
PCollectionList.of(replacementInts).and(moreReplacementInts));
+    assertThat(
+        replacements.keySet(),
+        Matchers.<PValue>containsInAnyOrder(replacementInts, 
moreReplacementInts));
+
+    ReplacementOutput intsMapping = replacements.get(replacementInts);
+    assertThat(intsMapping.getOriginal().getValue(), 
Matchers.<PValue>equalTo(ints));
+    assertThat(intsMapping.getReplacement().getValue(), 
Matchers.<PValue>equalTo(replacementInts));
+
+    ReplacementOutput moreIntsMapping = replacements.get(moreReplacementInts);
+    assertThat(moreIntsMapping.getOriginal().getValue(), 
Matchers.<PValue>equalTo(moreInts));
+    assertThat(
+        moreIntsMapping.getReplacement().getValue(), 
Matchers.<PValue>equalTo(moreReplacementInts));
+  }
+
+  @Test
+  public void orderedTooManyReplacements() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("same size");
+    ReplacementOutputs.ordered(
+        PCollectionList.of(ints).expand(),
+        PCollectionList.of(replacementInts).and(moreReplacementInts));
+  }
+
+  @Test
+  public void orderedTooFewReplacements() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("same size");
+    ReplacementOutputs.ordered(
+        PCollectionList.of(ints).and(moreInts).expand(), 
PCollectionList.of(moreReplacementInts));
+  }
+
+  private TupleTag<Integer> intsTag = new TupleTag<>();
+  private TupleTag<Integer> moreIntsTag = new TupleTag<>();
+  private TupleTag<String> strsTag = new TupleTag<>();
+
+  @Test
+  public void taggedSucceeds() {
+    PCollectionTuple original =
+        PCollectionTuple.of(intsTag, ints).and(strsTag, strs).and(moreIntsTag, 
moreInts);
+
+    Map<PValue, ReplacementOutput> replacements =
+        ReplacementOutputs.tagged(
+            original.expand(),
+            PCollectionTuple.of(strsTag, replacementStrs)
+                .and(moreIntsTag, moreReplacementInts)
+                .and(intsTag, replacementInts));
+    assertThat(
+        replacements.keySet(),
+        Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts, 
moreReplacementInts));
+    ReplacementOutput intsReplacement = replacements.get(replacementInts);
+    ReplacementOutput strsReplacement = replacements.get(replacementStrs);
+    ReplacementOutput moreIntsReplacement = 
replacements.get(moreReplacementInts);
+
+    assertThat(
+        intsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, 
replacementInts))));
+    assertThat(
+        strsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, 
replacementStrs))));
+    assertThat(
+        moreIntsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(moreIntsTag, moreInts),
+                TaggedPValue.of(moreIntsTag, moreReplacementInts))));
+  }
+
+  /**
+   * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made 
where the first
+   * argument contains multiple copies of the same {@link TaggedPValue}, the 
call succeeds using
+   * that mapping.
+   */
+  @Test
+  public void taggedMultipleInstances() {
+    List<TaggedPValue> original =
+        ImmutableList.of(
+            TaggedPValue.of(intsTag, ints),
+            TaggedPValue.of(strsTag, strs),
+            TaggedPValue.of(intsTag, ints));
+
+    Map<PValue, ReplacementOutput> replacements =
+        ReplacementOutputs.tagged(
+            original, PCollectionTuple.of(strsTag, 
replacementStrs).and(intsTag, replacementInts));
+    assertThat(
+        replacements.keySet(),
+        Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts));
+    ReplacementOutput intsReplacement = replacements.get(replacementInts);
+    ReplacementOutput strsReplacement = replacements.get(replacementStrs);
+
+    assertThat(
+        intsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, 
replacementInts))));
+    assertThat(
+        strsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, 
replacementStrs))));
+  }
+
+  /**
+   * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made 
where a single tag
+   * has multiple {@link PValue PValues} mapped to it, the call fails.
+   */
+  @Test
+  public void taggedMultipleConflictingInstancesThrows() {
+    List<TaggedPValue> original =
+        ImmutableList.of(
+            TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, 
moreReplacementInts));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("different values");
+    thrown.expectMessage(intsTag.toString());
+    thrown.expectMessage(ints.toString());
+    thrown.expectMessage(moreReplacementInts.toString());
+    ReplacementOutputs.tagged(
+        original,
+        PCollectionTuple.of(strsTag, replacementStrs)
+            .and(moreIntsTag, moreReplacementInts)
+            .and(intsTag, replacementInts));
+  }
+
+  @Test
+  public void taggedMissingReplacementThrows() {
+    PCollectionTuple original =
+        PCollectionTuple.of(intsTag, ints).and(strsTag, strs).and(moreIntsTag, 
moreInts);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Missing replacement");
+    thrown.expectMessage(intsTag.toString());
+    thrown.expectMessage(ints.toString());
+    ReplacementOutputs.tagged(
+        original.expand(),
+        PCollectionTuple.of(strsTag, replacementStrs).and(moreIntsTag, 
moreReplacementInts));
+  }
+
+  @Test
+  public void taggedExtraReplacementThrows() {
+    PCollectionTuple original = PCollectionTuple.of(intsTag, 
ints).and(strsTag, strs);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Missing original output");
+    thrown.expectMessage(moreIntsTag.toString());
+    thrown.expectMessage(moreReplacementInts.toString());
+    ReplacementOutputs.tagged(
+        original.expand(),
+        PCollectionTuple.of(strsTag, replacementStrs)
+            .and(moreIntsTag, moreReplacementInts)
+            .and(intsTag, replacementInts));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/61efaa47/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
index 1d9be66..0a167f3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.beam.sdk.runners;
 
+import com.google.auto.value.AutoValue;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -46,4 +47,18 @@ public interface PTransformOverrideFactory<
    * Returns the composite type that replacement transforms consumed from an 
equivalent expansion.
    */
   InputT getInput(List<TaggedPValue> inputs, Pipeline p);
+
+  /**
+   * A mapping between original {@link TaggedPValue} outputs and their 
replacements.
+   */
+  @AutoValue
+  abstract class ReplacementOutput {
+    public static ReplacementOutput of(TaggedPValue original, TaggedPValue 
replacement) {
+      return new 
AutoValue_PTransformOverrideFactory_ReplacementOutput(original, replacement);
+    }
+
+    public abstract TaggedPValue getOriginal();
+
+    public abstract TaggedPValue getReplacement();
+  }
 }

Reply via email to