Add ParDoWithType PTransform Matcher This matcher matches a ParDo where the contained DoFn matches a provided type.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e75fbe70 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e75fbe70 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e75fbe70 Branch: refs/heads/master Commit: e75fbe70989ecfac41bb05fecc1dcd602d2a0b5c Parents: de7b1a1 Author: Thomas Groh <tg...@google.com> Authored: Thu Feb 16 09:12:24 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Fri Feb 17 14:21:10 2017 -0800 ---------------------------------------------------------------------- .../core/construction/PTransformMatchers.java | 21 ++++++++++ .../construction/PTransformMatchersTest.java | 43 ++++++++++++++++++++ 2 files changed, 64 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e75fbe70/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index 579ab7c..7b05ed1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -162,6 +162,27 @@ public class PTransformMatchers { } /** + * A {@link PTransformMatcher} which matches a {@link ParDo.Bound} or {@link ParDo.BoundMulti} + * where the {@link DoFn} is of the provided type. + */ + public static PTransformMatcher parDoWithFnType(final Class<? extends DoFn> fnType) { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + DoFn<?, ?> fn; + if (application.getTransform() instanceof ParDo.Bound) { + fn = ((ParDo.Bound) application.getTransform()).getFn(); + } else if (application.getTransform() instanceof ParDo.BoundMulti) { + fn = ((ParDo.BoundMulti) application.getTransform()).getFn(); + } else { + return false; + } + return fnType.equals(fn.getClass()); + } + }; + } + + /** * A {@link PTransformMatcher} which matches a {@link Flatten.FlattenPCollectionList} which * consumes no input {@link PCollection PCollections}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/e75fbe70/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 1260289..439a475 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -27,9 +27,11 @@ import com.google.common.base.MoreObjects; import java.io.Serializable; import java.util.Collections; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; @@ -276,6 +278,47 @@ public class PTransformMatchersTest implements Serializable { } @Test + public void parDoWithFnTypeWithMatchingType() { + DoFn<Object, Object> fn = new DoFn<Object, Object>() { + @ProcessElement + public void process(ProcessContext ctxt) { + } + }; + AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn)); + AppliedPTransform<?, ?, ?> parDoMulti = + getAppliedTransform( + ParDo.of(fn).withOutputTags(new TupleTag<Object>(), TupleTagList.empty())); + + PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(fn.getClass()); + assertThat(matcher.matches(parDoSingle), is(true)); + assertThat(matcher.matches(parDoMulti), is(true)); + } + + @Test + public void parDoWithFnTypeWithNoMatch() { + DoFn<Object, Object> fn = new DoFn<Object, Object>() { + @ProcessElement + public void process(ProcessContext ctxt) { + } + }; + AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn)); + AppliedPTransform<?, ?, ?> parDoMulti = + getAppliedTransform( + ParDo.of(fn).withOutputTags(new TupleTag<Object>(), TupleTagList.empty())); + + PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(doFnWithState.getClass()); + assertThat(matcher.matches(parDoSingle), is(false)); + assertThat(matcher.matches(parDoMulti), is(false)); + } + + @Test + public void parDoWithFnTypeNotParDo() { + AppliedPTransform<?, ?, ?> notParDo = getAppliedTransform(Create.empty(VoidCoder.of())); + PTransformMatcher matcher = PTransformMatchers.parDoWithFnType(doFnWithState.getClass()); + assertThat(matcher.matches(notParDo), is(false)); + } + + @Test public void emptyFlattenWithEmptyFlatten() { AppliedPTransform application = AppliedPTransform