Repository: beam
Updated Branches:
  refs/heads/master 36ed3f6c5 -> 236a3cd96


Add PTransformMatchers#emptyFlatten

This matches a Flatten that consumes no inputs. It can be replaced
(approximately) with Create.empty(), as it will never produce output.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de7b1a1e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de7b1a1e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de7b1a1e

Branch: refs/heads/master
Commit: de7b1a1e461a38b292d57f9ce68b28a050ebb4ad
Parents: 36ed3f6
Author: Thomas Groh <tg...@google.com>
Authored: Thu Feb 16 09:05:41 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Feb 17 14:21:09 2017 -0800

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   | 16 +++++
 .../construction/PTransformMatchersTest.java    | 66 ++++++++++++++++++++
 2 files changed, 82 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/de7b1a1e/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 2823df8..579ab7c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -22,11 +22,13 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import 
org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} 
based on the class of the
@@ -158,4 +160,18 @@ public class PTransformMatchers {
       }
     };
   }
+
+  /**
+   * A {@link PTransformMatcher} which matches a {@link 
Flatten.FlattenPCollectionList} which
+   * consumes no input {@link PCollection PCollections}.
+   */
+  public static PTransformMatcher emptyFlatten() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        return (application.getTransform() instanceof 
Flatten.FlattenPCollectionList)
+            && application.getInputs().isEmpty();
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/de7b1a1e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 8218762..1260289 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -25,11 +25,13 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.base.MoreObjects;
 import java.io.Serializable;
+import java.util.Collections;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -46,6 +48,8 @@ import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.hamcrest.Matchers;
@@ -270,4 +274,66 @@ public class PTransformMatchersTest implements 
Serializable {
     
assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication),
 is(false));
     
assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication),
 is(false));
   }
+
+  @Test
+  public void emptyFlattenWithEmptyFlatten() {
+    AppliedPTransform application =
+        AppliedPTransform
+            .<PCollectionList<Object>, PCollection<Object>, 
Flatten.FlattenPCollectionList<Object>>
+                of(
+                    "EmptyFlatten",
+                    Collections.<TaggedPValue>emptyList(),
+                    Collections.singletonList(
+                        TaggedPValue.of(
+                            new TupleTag<Object>(),
+                            PCollection.createPrimitiveOutputInternal(
+                                p, WindowingStrategy.globalDefault(), 
IsBounded.BOUNDED))),
+                    Flatten.pCollections(),
+                    p);
+
+    assertThat(PTransformMatchers.emptyFlatten().matches(application), 
is(true));
+  }
+
+  @Test
+  public void emptyFlattenWithNonEmptyFlatten() {
+    AppliedPTransform application =
+        AppliedPTransform
+            .<PCollectionList<Object>, PCollection<Object>, 
Flatten.FlattenPCollectionList<Object>>
+                of(
+                    "Flatten",
+                    Collections.singletonList(
+                        TaggedPValue.of(
+                            new TupleTag<Object>(),
+                            PCollection.createPrimitiveOutputInternal(
+                                p, WindowingStrategy.globalDefault(), 
IsBounded.BOUNDED))),
+                    Collections.singletonList(
+                        TaggedPValue.of(
+                            new TupleTag<Object>(),
+                            PCollection.createPrimitiveOutputInternal(
+                                p, WindowingStrategy.globalDefault(), 
IsBounded.BOUNDED))),
+                    Flatten.pCollections(),
+                    p);
+
+    assertThat(PTransformMatchers.emptyFlatten().matches(application), 
is(false));
+  }
+
+  @Test
+  public void emptyFlattenWithNonFlatten() {
+    AppliedPTransform application =
+        AppliedPTransform
+            .<PCollection<Iterable<Object>>, PCollection<Object>, 
Flatten.FlattenIterables<Object>>
+                of(
+                    "EmptyFlatten",
+                    Collections.<TaggedPValue>emptyList(),
+                    Collections.singletonList(
+                        TaggedPValue.of(
+                            new TupleTag<Object>(),
+                            PCollection.createPrimitiveOutputInternal(
+                                p, WindowingStrategy.globalDefault(), 
IsBounded.BOUNDED))),
+                    Flatten.iterables() /* This isn't actually possible to 
construct,
+                                 * but for the sake of example */,
+                    p);
+
+    assertThat(PTransformMatchers.emptyFlatten().matches(application), 
is(false));
+  }
 }

Reply via email to