Explicitly pass Pipeline in AppliedPTransform

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74d977a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74d977a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74d977a7

Branch: refs/heads/master
Commit: 74d977a7d6e3b50b50c5e20f34ab17af5bd5dd91
Parents: 9ec22f1
Author: Thomas Groh <tg...@google.com>
Authored: Tue Feb 7 09:58:16 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Feb 9 13:21:53 2017 -0800

----------------------------------------------------------------------
 .../runners/core/PTransformMatchersTest.java    | 20 +++++++++++++-----
 .../runners/direct/CommittedResultTest.java     | 17 +++++++++------
 .../direct/WindowEvaluatorFactoryTest.java      |  7 ++++++-
 .../dataflow/DataflowPipelineJobTest.java       |  9 +++++++-
 .../beam/sdk/runners/TransformHierarchy.java    |  7 ++++++-
 .../beam/sdk/transforms/AppliedPTransform.java  | 22 +++++++++++---------
 6 files changed, 58 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
index c286a37..fe0c449 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.beam.runners.core.runnerapi;
+package org.apache.beam.runners.core;
 
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import org.apache.beam.runners.core.PTransformMatchers;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -60,7 +59,11 @@ public class PTransformMatchersTest implements Serializable {
     });
     PCollection<Integer> output = input.apply(pardo);
 
-    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of("DoStuff", 
input, output, pardo);
+    AppliedPTransform<?, ?, ?> application =
+        AppliedPTransform
+            .<PCollection<Integer>, PCollection<Integer>,
+                PTransform<? super PCollection<Integer>, PCollection<Integer>>>
+                of("DoStuff", input.expand(), output.expand(), pardo, p);
 
     assertThat(matcher.matches(application), is(true));
   }
@@ -82,7 +85,10 @@ public class PTransformMatchersTest implements Serializable {
     PCollection<Integer> output = input.apply(subclass);
 
     AppliedPTransform<?, ?, ?> application =
-        AppliedPTransform.of("DoStuff", input, output, subclass);
+        AppliedPTransform
+            .<PCollection<Integer>, PCollection<Integer>,
+                PTransform<PCollection<Integer>, PCollection<Integer>>>
+                of("DoStuff", input.expand(), output.expand(), subclass, p);
 
     assertThat(matcher.matches(application), is(false));
   }
@@ -94,7 +100,11 @@ public class PTransformMatchersTest implements Serializable 
{
     Window.Bound<Integer> window = Window.into(new GlobalWindows());
     PCollection<Integer> output = input.apply(window);
 
-    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of("DoStuff", 
input, output, window);
+    AppliedPTransform<?, ?, ?> application =
+        AppliedPTransform
+            .<PCollection<Integer>, PCollection<Integer>,
+                PTransform<PCollection<Integer>, PCollection<Integer>>>
+                of("DoStuff", input.expand(), output.expand(), window, p);
 
     assertThat(matcher.matches(application), is(false));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index 736f554..68d6eba 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -54,12 +54,17 @@ public class CommittedResultTest implements Serializable {
 
   private transient PCollection<Integer> created = p.apply(Create.of(1, 2));
   private transient AppliedPTransform<?, ?, ?> transform =
-      AppliedPTransform.of("foo", p.begin(), PDone.in(p), new 
PTransform<PBegin, PDone>() {
-        @Override
-        public PDone expand(PBegin begin) {
-          throw new IllegalArgumentException("Should never be applied");
-        }
-      });
+      AppliedPTransform.<PBegin, PDone, PTransform<PBegin, PDone>>of(
+          "foo",
+          p.begin().expand(),
+          PDone.in(p).expand(),
+          new PTransform<PBegin, PDone>() {
+            @Override
+            public PDone expand(PBegin begin) {
+              throw new IllegalArgumentException("Should never be applied");
+            }
+          },
+          p);
   private transient BundleFactory bundleFactory = 
ImmutableListBundleFactory.create();
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 9d0c68d..aa841ed 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -311,7 +312,11 @@ public class WindowEvaluatorFactoryTest {
       throws Exception {
     TransformEvaluator<Long> evaluator =
         factory.forApplication(
-            AppliedPTransform.of("Window", input, windowed, windowTransform), 
inputBundle);
+            AppliedPTransform
+                .<PCollection<Long>, PCollection<Long>,
+                    PTransform<PCollection<Long>, PCollection<Long>>>
+                    of("Window", input.expand(), windowed.expand(), 
windowTransform, p),
+            inputBundle);
 
     evaluator.processElement(valueInGlobalWindow);
     evaluator.processElement(valueInGlobalAndTwoIntervalWindows);

http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 36bf129..2690e71 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -52,6 +52,7 @@ import com.google.common.collect.ImmutableSetMultimap;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.net.SocketTimeoutException;
+import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
@@ -71,6 +72,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Rule;
@@ -671,7 +673,12 @@ public class DataflowPipelineJobTest {
       String fullName, PTransform<PInput, POutput> transform, Pipeline p) {
     PInput input = mock(PInput.class);
     when(input.getPipeline()).thenReturn(p);
-    return AppliedPTransform.of(fullName, input, mock(POutput.class), 
transform);
+    return AppliedPTransform.of(
+        fullName,
+        Collections.<TaggedPValue>emptyList(),
+        Collections.<TaggedPValue>emptyList(),
+        transform,
+        p);
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index dc8f823..a4c28b8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -323,7 +323,12 @@ public class TransformHierarchy {
      * Returns the {@link AppliedPTransform} representing this {@link Node}.
      */
     public AppliedPTransform<?, ?, ?> toAppliedPTransform() {
-      return AppliedPTransform.of(getFullName(), input, output, (PTransform) 
getTransform());
+      return AppliedPTransform.of(
+          getFullName(),
+          input.expand(),
+          output.expand(),
+          (PTransform) getTransform(),
+          input.getPipeline());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
index a6d8859..4de81ac 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
@@ -30,23 +30,25 @@ import org.apache.beam.sdk.values.TaggedPValue;
  *
  * <p>For internal use.
  *
- * @param <InputT> transform input type
- * @param <OutputT> transform output type
+ * @param <InputT>     transform input type
+ * @param <OutputT>    transform output type
  * @param <TransformT> transform type
  */
 @AutoValue
-public abstract class AppliedPTransform
-    <InputT extends PInput, OutputT extends POutput,
-     TransformT extends PTransform<? super InputT, OutputT>> {
+public abstract class AppliedPTransform<
+    InputT extends PInput, OutputT extends POutput,
+    TransformT extends PTransform<? super InputT, OutputT>> {
 
-  public static <
-          InputT extends PInput,
-          OutputT extends POutput,
+  public static <InputT extends PInput, OutputT extends POutput,
           TransformT extends PTransform<? super InputT, OutputT>>
       AppliedPTransform<InputT, OutputT, TransformT> of(
-          String fullName, InputT input, OutputT output, TransformT transform) 
{
+          String fullName,
+          List<TaggedPValue> input,
+          List<TaggedPValue> output,
+          TransformT transform,
+          Pipeline p) {
     return new AutoValue_AppliedPTransform<InputT, OutputT, TransformT>(
-        fullName, input.expand(), output.expand(), transform, 
input.getPipeline());
+        fullName, input, output, transform, p);
   }
 
   public abstract String getFullName();

Reply via email to