Add Runner API oriented PTransformMatchers for DirectRunner overrides
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca7b9c28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca7b9c28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca7b9c28 Branch: refs/heads/gearpump-runner Commit: ca7b9c288151d318898ab000b91d26fcf62046ca Parents: b53e6f0 Author: Kenneth Knowles <k...@google.com> Authored: Thu May 25 06:29:09 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Jun 8 11:36:28 2017 -0700 ---------------------------------------------------------------------- .../core/construction/PTransformMatchers.java | 94 +++++++++++++++++++- .../construction/PTransformTranslation.java | 7 +- .../construction/PTransformMatchersTest.java | 32 +++++++ 3 files changed, 128 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index bfe24a0..c339891 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core.construction; import com.google.common.base.MoreObjects; +import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; @@ -50,6 +51,34 @@ public class PTransformMatchers { private PTransformMatchers() {} /** + * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the URN of the + * {@link PTransform} is equal to the URN provided ot this matcher. + */ + public static PTransformMatcher urnEqualTo(String urn) { + return new EqualUrnPTransformMatcher(urn); + } + + private static class EqualUrnPTransformMatcher implements PTransformMatcher { + private final String urn; + + private EqualUrnPTransformMatcher(String urn) { + this.urn = urn; + } + + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + return urn.equals(PTransformTranslation.urnForTransformOrNull(application.getTransform())); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("urn", urn) + .toString(); + } + } + + /** * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the class of the * {@link PTransform} is equal to the {@link Class} provided ot this matcher. */ @@ -151,6 +180,68 @@ public class PTransformMatchers { } /** + * A {@link PTransformMatcher} that matches a {@link ParDo} by URN if it has a splittable {@link + * DoFn}. + */ + public static PTransformMatcher splittableParDo() { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals( + PTransformTranslation.urnForTransformOrNull(application.getTransform()))) { + + try { + return ParDoTranslation.isSplittable(application); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Transform with URN %s could not be translated", + PTransformTranslation.PAR_DO_TRANSFORM_URN), + e); + } + } + return false; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper("SplittableParDoMultiMatcher").toString(); + } + }; + } + + /** + * A {@link PTransformMatcher} that matches a {@link ParDo} transform by URN + * and whether it contains state or timers as specified by {@link ParDoTranslation}. + */ + public static PTransformMatcher stateOrTimerParDo() { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals( + PTransformTranslation.urnForTransformOrNull(application.getTransform()))) { + + try { + return ParDoTranslation.usesStateOrTimers(application); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Transform with URN %s could not be translated", + PTransformTranslation.PAR_DO_TRANSFORM_URN), + e); + } + } + return false; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper("StateOrTimerParDoMatcher").toString(); + } + }; + } + + /** * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn} * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and * {@link DoFnSignature#usesTimers()}. @@ -268,7 +359,8 @@ public class PTransformMatchers { return new PTransformMatcher() { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { - if (application.getTransform() instanceof WriteFiles) { + if (PTransformTranslation.WRITE_FILES_TRANSFORM_URN.equals( + PTransformTranslation.urnForTransformOrNull(application.getTransform()))) { WriteFiles write = (WriteFiles) application.getTransform(); return write.getSharding() == null && write.getNumShards() == null; } http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 32ecf43..bae7b05 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -179,13 +179,12 @@ public class PTransformTranslation { * Returns the URN for the transform if it is known, otherwise throws. */ public static String urnForTransform(PTransform<?, ?> transform) { - TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()); - if (translator == null) { + String urn = urnForTransformOrNull(transform); + if (urn == null) { throw new IllegalStateException( String.format("No translator known for %s", transform.getClass().getName())); } - - return translator.getUrn(transform); + return urn; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 2497598..6459849 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -27,6 +27,8 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.util.Collections; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.DefaultFilenamePolicy; @@ -95,9 +97,14 @@ public class PTransformMatchersTest implements Serializable { PCollection<KV<String, Integer>> input = PCollection.createPrimitiveOutputInternal( p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + input.setName("dummy input"); + input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); + PCollection<Integer> output = PCollection.createPrimitiveOutputInternal( p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); + output.setName("dummy output"); + output.setCoder(VarIntCoder.of()); return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p); } @@ -272,6 +279,18 @@ public class PTransformMatchersTest implements Serializable { } @Test + public void parDoSplittable() { + AppliedPTransform<?, ?, ?> parDoApplication = + getAppliedTransform( + ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty())); + assertThat(PTransformMatchers.splittableParDo().matches(parDoApplication), is(true)); + + assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); + assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); + } + + @Test public void parDoMultiWithState() { AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform( @@ -284,6 +303,19 @@ public class PTransformMatchersTest implements Serializable { } @Test + public void parDoWithState() { + AppliedPTransform<?, ?, ?> statefulApplication = + getAppliedTransform( + ParDo.of(doFnWithState).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty())); + assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), is(true)); + + AppliedPTransform<?, ?, ?> splittableApplication = + getAppliedTransform( + ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty())); + assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication), is(false)); + } + + @Test public void parDoMultiWithTimers() { AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(