Add ParDoWithType PTransform Matcher

This matcher matches a ParDo where the contained DoFn matches a provided
type.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e75fbe70
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e75fbe70
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e75fbe70

Branch: refs/heads/master
Commit: e75fbe70989ecfac41bb05fecc1dcd602d2a0b5c
Parents: de7b1a1
Author: Thomas Groh <tg...@google.com>
Authored: Thu Feb 16 09:12:24 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Feb 17 14:21:10 2017 -0800

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   | 21 ++++++++++
 .../construction/PTransformMatchersTest.java    | 43 ++++++++++++++++++++
 2 files changed, 64 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e75fbe70/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 579ab7c..7b05ed1 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -162,6 +162,27 @@ public class PTransformMatchers {
   }
 
   /**
+   * A {@link PTransformMatcher} which matches a {@link ParDo.Bound} or {@link 
ParDo.BoundMulti}
+   * where the {@link DoFn} is of the provided type.
+   */
+  public static PTransformMatcher parDoWithFnType(final Class<? extends DoFn> 
fnType) {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        DoFn<?, ?> fn;
+        if (application.getTransform() instanceof ParDo.Bound) {
+          fn = ((ParDo.Bound) application.getTransform()).getFn();
+        } else if (application.getTransform() instanceof ParDo.BoundMulti) {
+          fn = ((ParDo.BoundMulti) application.getTransform()).getFn();
+        } else {
+          return false;
+        }
+        return fnType.equals(fn.getClass());
+      }
+    };
+  }
+
+  /**
    * A {@link PTransformMatcher} which matches a {@link 
Flatten.FlattenPCollectionList} which
    * consumes no input {@link PCollection PCollections}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/e75fbe70/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 1260289..439a475 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -27,9 +27,11 @@ import com.google.common.base.MoreObjects;
 import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -276,6 +278,47 @@ public class PTransformMatchersTest implements 
Serializable {
   }
 
   @Test
+  public void parDoWithFnTypeWithMatchingType() {
+    DoFn<Object, Object> fn = new DoFn<Object, Object>() {
+      @ProcessElement
+      public void process(ProcessContext ctxt) {
+      }
+    };
+    AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn));
+    AppliedPTransform<?, ?, ?> parDoMulti =
+        getAppliedTransform(
+            ParDo.of(fn).withOutputTags(new TupleTag<Object>(), 
TupleTagList.empty()));
+
+    PTransformMatcher matcher = 
PTransformMatchers.parDoWithFnType(fn.getClass());
+    assertThat(matcher.matches(parDoSingle), is(true));
+    assertThat(matcher.matches(parDoMulti), is(true));
+  }
+
+  @Test
+  public void parDoWithFnTypeWithNoMatch() {
+    DoFn<Object, Object> fn = new DoFn<Object, Object>() {
+      @ProcessElement
+      public void process(ProcessContext ctxt) {
+      }
+    };
+    AppliedPTransform<?, ?, ?> parDoSingle = getAppliedTransform(ParDo.of(fn));
+    AppliedPTransform<?, ?, ?> parDoMulti =
+        getAppliedTransform(
+            ParDo.of(fn).withOutputTags(new TupleTag<Object>(), 
TupleTagList.empty()));
+
+    PTransformMatcher matcher = 
PTransformMatchers.parDoWithFnType(doFnWithState.getClass());
+    assertThat(matcher.matches(parDoSingle), is(false));
+    assertThat(matcher.matches(parDoMulti), is(false));
+  }
+
+  @Test
+  public void parDoWithFnTypeNotParDo() {
+    AppliedPTransform<?, ?, ?> notParDo = 
getAppliedTransform(Create.empty(VoidCoder.of()));
+    PTransformMatcher matcher = 
PTransformMatchers.parDoWithFnType(doFnWithState.getClass());
+    assertThat(matcher.matches(notParDo), is(false));
+  }
+
+  @Test
   public void emptyFlattenWithEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform

Reply via email to