Add Runner API oriented PTransformMatchers for DirectRunner overrides

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca7b9c28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca7b9c28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca7b9c28

Branch: refs/heads/gearpump-runner
Commit: ca7b9c288151d318898ab000b91d26fcf62046ca
Parents: b53e6f0
Author: Kenneth Knowles <k...@google.com>
Authored: Thu May 25 06:29:09 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Thu Jun 8 11:36:28 2017 -0700

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   | 94 +++++++++++++++++++-
 .../construction/PTransformTranslation.java     |  7 +-
 .../construction/PTransformMatchersTest.java    | 32 +++++++
 3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index bfe24a0..c339891 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core.construction;
 
 import com.google.common.base.MoreObjects;
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -50,6 +51,34 @@ public class PTransformMatchers {
   private PTransformMatchers() {}
 
   /**
+   * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if 
the URN of the
+   * {@link PTransform} is equal to the URN provided ot this matcher.
+   */
+  public static PTransformMatcher urnEqualTo(String urn) {
+    return new EqualUrnPTransformMatcher(urn);
+  }
+
+  private static class EqualUrnPTransformMatcher implements PTransformMatcher {
+    private final String urn;
+
+    private EqualUrnPTransformMatcher(String urn) {
+      this.urn = urn;
+    }
+
+    @Override
+    public boolean matches(AppliedPTransform<?, ?, ?> application) {
+      return 
urn.equals(PTransformTranslation.urnForTransformOrNull(application.getTransform()));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("urn", urn)
+          .toString();
+    }
+  }
+
+  /**
    * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if 
the class of the
    * {@link PTransform} is equal to the {@link Class} provided ot this matcher.
    */
@@ -151,6 +180,68 @@ public class PTransformMatchers {
   }
 
   /**
+   * A {@link PTransformMatcher} that matches a {@link ParDo} by URN if it has 
a splittable {@link
+   * DoFn}.
+   */
+  public static PTransformMatcher splittableParDo() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals(
+            
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
+
+          try {
+            return ParDoTranslation.isSplittable(application);
+          } catch (IOException e) {
+            throw new RuntimeException(
+                String.format(
+                    "Transform with URN %s could not be translated",
+                    PTransformTranslation.PAR_DO_TRANSFORM_URN),
+                e);
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public String toString() {
+        return 
MoreObjects.toStringHelper("SplittableParDoMultiMatcher").toString();
+      }
+    };
+  }
+
+  /**
+   * A {@link PTransformMatcher} that matches a {@link ParDo} transform by URN
+   * and whether it contains state or timers as specified by {@link 
ParDoTranslation}.
+   */
+  public static PTransformMatcher stateOrTimerParDo() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals(
+            
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
+
+          try {
+            return ParDoTranslation.usesStateOrTimers(application);
+          } catch (IOException e) {
+            throw new RuntimeException(
+                String.format(
+                    "Transform with URN %s could not be translated",
+                    PTransformTranslation.PAR_DO_TRANSFORM_URN),
+                e);
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public String toString() {
+        return 
MoreObjects.toStringHelper("StateOrTimerParDoMatcher").toString();
+      }
+    };
+  }
+
+  /**
    * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} 
containing a {@link DoFn}
    * that uses state or timers, as specified by {@link 
DoFnSignature#usesState()} and
    * {@link DoFnSignature#usesTimers()}.
@@ -268,7 +359,8 @@ public class PTransformMatchers {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (application.getTransform() instanceof WriteFiles) {
+        if (PTransformTranslation.WRITE_FILES_TRANSFORM_URN.equals(
+            
PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
           WriteFiles write = (WriteFiles) application.getTransform();
           return write.getSharding() == null && write.getNumShards() == null;
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 32ecf43..bae7b05 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -179,13 +179,12 @@ public class PTransformTranslation {
    * Returns the URN for the transform if it is known, otherwise throws.
    */
   public static String urnForTransform(PTransform<?, ?> transform) {
-    TransformPayloadTranslator translator = 
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
-    if (translator == null) {
+    String urn = urnForTransformOrNull(transform);
+    if (urn == null) {
       throw new IllegalStateException(
           String.format("No translator known for %s", 
transform.getClass().getName()));
     }
-
-    return translator.getUrn(transform);
+    return urn;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 2497598..6459849 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -27,6 +27,8 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.util.Collections;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy;
@@ -95,9 +97,14 @@ public class PTransformMatchersTest implements Serializable {
     PCollection<KV<String, Integer>> input =
         PCollection.createPrimitiveOutputInternal(
             p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+    input.setName("dummy input");
+    input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
     PCollection<Integer> output =
         PCollection.createPrimitiveOutputInternal(
             p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+    output.setName("dummy output");
+    output.setCoder(VarIntCoder.of());
 
     return AppliedPTransform.of("pardo", input.expand(), output.expand(), 
pardo, p);
   }
@@ -272,6 +279,18 @@ public class PTransformMatchersTest implements 
Serializable {
   }
 
   @Test
+  public void parDoSplittable() {
+    AppliedPTransform<?, ?, ?> parDoApplication =
+        getAppliedTransform(
+            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), 
TupleTagList.empty()));
+    assertThat(PTransformMatchers.splittableParDo().matches(parDoApplication), 
is(true));
+
+    
assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication),
 is(false));
+    
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication),
 is(false));
+    
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication),
 is(false));
+  }
+
+  @Test
   public void parDoMultiWithState() {
     AppliedPTransform<?, ?, ?> parDoApplication =
         getAppliedTransform(
@@ -284,6 +303,19 @@ public class PTransformMatchersTest implements 
Serializable {
   }
 
   @Test
+  public void parDoWithState() {
+    AppliedPTransform<?, ?, ?> statefulApplication =
+        getAppliedTransform(
+            ParDo.of(doFnWithState).withOutputTags(new TupleTag<Integer>(), 
TupleTagList.empty()));
+    
assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), 
is(true));
+
+    AppliedPTransform<?, ?, ?> splittableApplication =
+        getAppliedTransform(
+            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), 
TupleTagList.empty()));
+    
assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication),
 is(false));
+  }
+
+  @Test
   public void parDoMultiWithTimers() {
     AppliedPTransform<?, ?, ?> parDoApplication =
         getAppliedTransform(

Reply via email to