Repository: beam Updated Branches: refs/heads/master 36ed3f6c5 -> 236a3cd96
Add PTransformMatchers#emptyFlatten This matches a Flatten that consumes no inputs. It can be replaced (approximately) with Create.empty(), as it will never produce output. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de7b1a1e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de7b1a1e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de7b1a1e Branch: refs/heads/master Commit: de7b1a1e461a38b292d57f9ce68b28a050ebb4ad Parents: 36ed3f6 Author: Thomas Groh <tg...@google.com> Authored: Thu Feb 16 09:05:41 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Fri Feb 17 14:21:09 2017 -0800 ---------------------------------------------------------------------- .../core/construction/PTransformMatchers.java | 16 +++++ .../construction/PTransformMatchersTest.java | 66 ++++++++++++++++++++ 2 files changed, 82 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/de7b1a1e/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index 2823df8..579ab7c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -22,11 +22,13 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.values.PCollection; /** * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the @@ -158,4 +160,18 @@ public class PTransformMatchers { } }; } + + /** + * A {@link PTransformMatcher} which matches a {@link Flatten.FlattenPCollectionList} which + * consumes no input {@link PCollection PCollections}. + */ + public static PTransformMatcher emptyFlatten() { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + return (application.getTransform() instanceof Flatten.FlattenPCollectionList) + && application.getInputs().isEmpty(); + } + }; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/de7b1a1e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 8218762..1260289 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -25,11 +25,13 @@ import static org.junit.Assert.assertThat; import com.google.common.base.MoreObjects; import java.io.Serializable; +import java.util.Collections; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -46,6 +48,8 @@ import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.Matchers; @@ -270,4 +274,66 @@ public class PTransformMatchersTest implements Serializable { assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false)); assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false)); } + + @Test + public void emptyFlattenWithEmptyFlatten() { + AppliedPTransform application = + AppliedPTransform + .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>> + of( + "EmptyFlatten", + Collections.<TaggedPValue>emptyList(), + Collections.singletonList( + TaggedPValue.of( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), + Flatten.pCollections(), + p); + + assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true)); + } + + @Test + public void emptyFlattenWithNonEmptyFlatten() { + AppliedPTransform application = + AppliedPTransform + .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>> + of( + "Flatten", + Collections.singletonList( + TaggedPValue.of( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), + Collections.singletonList( + TaggedPValue.of( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), + Flatten.pCollections(), + p); + + assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false)); + } + + @Test + public void emptyFlattenWithNonFlatten() { + AppliedPTransform application = + AppliedPTransform + .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.FlattenIterables<Object>> + of( + "EmptyFlatten", + Collections.<TaggedPValue>emptyList(), + Collections.singletonList( + TaggedPValue.of( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), + Flatten.iterables() /* This isn't actually possible to construct, + * but for the sake of example */, + p); + + assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false)); + } }