Explicitly pass Pipeline in AppliedPTransform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/74d977a7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/74d977a7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/74d977a7 Branch: refs/heads/master Commit: 74d977a7d6e3b50b50c5e20f34ab17af5bd5dd91 Parents: 9ec22f1 Author: Thomas Groh <tg...@google.com> Authored: Tue Feb 7 09:58:16 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Thu Feb 9 13:21:53 2017 -0800 ---------------------------------------------------------------------- .../runners/core/PTransformMatchersTest.java | 20 +++++++++++++----- .../runners/direct/CommittedResultTest.java | 17 +++++++++------ .../direct/WindowEvaluatorFactoryTest.java | 7 ++++++- .../dataflow/DataflowPipelineJobTest.java | 9 +++++++- .../beam/sdk/runners/TransformHierarchy.java | 7 ++++++- .../beam/sdk/transforms/AppliedPTransform.java | 22 +++++++++++--------- 6 files changed, 58 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java index c286a37..fe0c449 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.beam.runners.core.runnerapi; +package org.apache.beam.runners.core; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import java.io.Serializable; -import org.apache.beam.runners.core.PTransformMatchers; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -60,7 +59,11 @@ public class PTransformMatchersTest implements Serializable { }); PCollection<Integer> output = input.apply(pardo); - AppliedPTransform<?, ?, ?> application = AppliedPTransform.of("DoStuff", input, output, pardo); + AppliedPTransform<?, ?, ?> application = + AppliedPTransform + .<PCollection<Integer>, PCollection<Integer>, + PTransform<? super PCollection<Integer>, PCollection<Integer>>> + of("DoStuff", input.expand(), output.expand(), pardo, p); assertThat(matcher.matches(application), is(true)); } @@ -82,7 +85,10 @@ public class PTransformMatchersTest implements Serializable { PCollection<Integer> output = input.apply(subclass); AppliedPTransform<?, ?, ?> application = - AppliedPTransform.of("DoStuff", input, output, subclass); + AppliedPTransform + .<PCollection<Integer>, PCollection<Integer>, + PTransform<PCollection<Integer>, PCollection<Integer>>> + of("DoStuff", input.expand(), output.expand(), subclass, p); assertThat(matcher.matches(application), is(false)); } @@ -94,7 +100,11 @@ public class PTransformMatchersTest implements Serializable { Window.Bound<Integer> window = Window.into(new GlobalWindows()); PCollection<Integer> output = input.apply(window); - AppliedPTransform<?, ?, ?> application = AppliedPTransform.of("DoStuff", input, output, window); + AppliedPTransform<?, ?, ?> application = + AppliedPTransform + .<PCollection<Integer>, PCollection<Integer>, + PTransform<PCollection<Integer>, PCollection<Integer>>> + of("DoStuff", input.expand(), output.expand(), window, p); assertThat(matcher.matches(application), is(false)); } http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java index 736f554..68d6eba 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java @@ -54,12 +54,17 @@ public class CommittedResultTest implements Serializable { private transient PCollection<Integer> created = p.apply(Create.of(1, 2)); private transient AppliedPTransform<?, ?, ?> transform = - AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>() { - @Override - public PDone expand(PBegin begin) { - throw new IllegalArgumentException("Should never be applied"); - } - }); + AppliedPTransform.<PBegin, PDone, PTransform<PBegin, PDone>>of( + "foo", + p.begin().expand(), + PDone.in(p).expand(), + new PTransform<PBegin, PDone>() { + @Override + public PDone expand(PBegin begin) { + throw new IllegalArgumentException("Should never be applied"); + } + }, + p); private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); @Test http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index 9d0c68d..aa841ed 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -311,7 +312,11 @@ public class WindowEvaluatorFactoryTest { throws Exception { TransformEvaluator<Long> evaluator = factory.forApplication( - AppliedPTransform.of("Window", input, windowed, windowTransform), inputBundle); + AppliedPTransform + .<PCollection<Long>, PCollection<Long>, + PTransform<PCollection<Long>, PCollection<Long>>> + of("Window", input.expand(), windowed.expand(), windowTransform, p), + inputBundle); evaluator.processElement(valueInGlobalWindow); evaluator.processElement(valueInGlobalAndTwoIntervalWindows); http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 36bf129..2690e71 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -52,6 +52,7 @@ import com.google.common.collect.ImmutableSetMultimap; import java.io.IOException; import java.math.BigDecimal; import java.net.SocketTimeoutException; +import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; @@ -71,6 +72,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.TaggedPValue; import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; @@ -671,7 +673,12 @@ public class DataflowPipelineJobTest { String fullName, PTransform<PInput, POutput> transform, Pipeline p) { PInput input = mock(PInput.class); when(input.getPipeline()).thenReturn(p); - return AppliedPTransform.of(fullName, input, mock(POutput.class), transform); + return AppliedPTransform.of( + fullName, + Collections.<TaggedPValue>emptyList(), + Collections.<TaggedPValue>emptyList(), + transform, + p); } http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index dc8f823..a4c28b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -323,7 +323,12 @@ public class TransformHierarchy { * Returns the {@link AppliedPTransform} representing this {@link Node}. */ public AppliedPTransform<?, ?, ?> toAppliedPTransform() { - return AppliedPTransform.of(getFullName(), input, output, (PTransform) getTransform()); + return AppliedPTransform.of( + getFullName(), + input.expand(), + output.expand(), + (PTransform) getTransform(), + input.getPipeline()); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/74d977a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java index a6d8859..4de81ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java @@ -30,23 +30,25 @@ import org.apache.beam.sdk.values.TaggedPValue; * * <p>For internal use. * - * @param <InputT> transform input type - * @param <OutputT> transform output type + * @param <InputT> transform input type + * @param <OutputT> transform output type * @param <TransformT> transform type */ @AutoValue -public abstract class AppliedPTransform - <InputT extends PInput, OutputT extends POutput, - TransformT extends PTransform<? super InputT, OutputT>> { +public abstract class AppliedPTransform< + InputT extends PInput, OutputT extends POutput, + TransformT extends PTransform<? super InputT, OutputT>> { - public static < - InputT extends PInput, - OutputT extends POutput, + public static <InputT extends PInput, OutputT extends POutput, TransformT extends PTransform<? super InputT, OutputT>> AppliedPTransform<InputT, OutputT, TransformT> of( - String fullName, InputT input, OutputT output, TransformT transform) { + String fullName, + List<TaggedPValue> input, + List<TaggedPValue> output, + TransformT transform, + Pipeline p) { return new AutoValue_AppliedPTransform<InputT, OutputT, TransformT>( - fullName, input.expand(), output.expand(), transform, input.getPipeline()); + fullName, input, output, transform, p); } public abstract String getFullName();