Repository: beam
Updated Branches:
  refs/heads/master cf1925fba -> bdcd26c33


Add PTransformMatcher

This interface is used to identify PTransforms to replace during
Pipeline Surgery.

Add ClassPTransformMatcher as the initial simple matcher.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31833dc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31833dc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31833dc6

Branch: refs/heads/master
Commit: 31833dc6ff64bfdabc88abf4065760977f53abdd
Parents: cf1925f
Author: Thomas Groh <tg...@google.com>
Authored: Tue Feb 7 09:35:25 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Feb 8 10:28:43 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/PTransformMatchers.java   |  59 +++++++++++
 .../runners/core/PTransformMatchersTest.java    | 101 +++++++++++++++++++
 .../beam/sdk/runners/PTransformMatcher.java     |  32 ++++++
 3 files changed, 192 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/31833dc6/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java
new file mode 100644
index 0000000..362e8dc
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} 
based on the class of the
+ * transform.
+ *
+ * <p>Once {@link PTransform PTransforms} have URNs, this will be removed and 
replaced with a
+ * UrnPTransformMatcher.
+ */
+@Experimental(Kind.CORE_RUNNERS_ONLY)
+public class PTransformMatchers {
+  private PTransformMatchers() {}
+
+  /**
+   * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if 
the class of the
+   * {@link PTransform} is equal to the {@link Class} provided ot this matcher.
+   * @param clazz
+   * @return
+   */
+  public static PTransformMatcher classEqualTo(Class<? extends PTransform> 
clazz) {
+    return new EqualClassPTransformMatcher(clazz);
+  }
+
+  private static class EqualClassPTransformMatcher implements 
PTransformMatcher {
+    private final Class<? extends PTransform> clazz;
+
+    private EqualClassPTransformMatcher(Class<? extends PTransform> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public boolean matches(AppliedPTransform<?, ?, ?> application) {
+      return application.getTransform().getClass().equals(clazz);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31833dc6/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
new file mode 100644
index 0000000..c286a37
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.runnerapi;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.runners.core.PTransformMatchers;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link PTransformMatcher}.
+ */
+@RunWith(JUnit4.class)
+public class PTransformMatchersTest implements Serializable {
+  @Rule
+  public transient TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Test
+  public void classEqualToMatchesSameClass() {
+    PTransformMatcher matcher = 
PTransformMatchers.classEqualTo(ParDo.Bound.class);
+    PCollection<Integer> input = p.apply(Create.of(1));
+    ParDo.Bound<Integer, Integer> pardo = ParDo.of(new DoFn<Integer, 
Integer>() {
+      @ProcessElement
+      public void doStuff(ProcessContext ctxt) {
+      }
+    });
+    PCollection<Integer> output = input.apply(pardo);
+
+    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of("DoStuff", 
input, output, pardo);
+
+    assertThat(matcher.matches(application), is(true));
+  }
+
+  @Test
+  public void classEqualToMatchesSubClass() {
+    class MyPTransform extends PTransform<PCollection<Integer>, 
PCollection<Integer>> {
+      @Override
+      public PCollection<Integer> expand(PCollection<Integer> input) {
+        return input;
+      }
+    }
+    PTransformMatcher matcher = 
PTransformMatchers.classEqualTo(MyPTransform.class);
+    PCollection<Integer> input = p.apply(Create.of(1));
+    MyPTransform subclass = new MyPTransform() {};
+
+    assertThat(subclass.getClass(), 
not(Matchers.<Class<?>>equalTo(MyPTransform.class)));
+    assertThat(subclass, instanceOf(MyPTransform.class));
+    PCollection<Integer> output = input.apply(subclass);
+
+    AppliedPTransform<?, ?, ?> application =
+        AppliedPTransform.of("DoStuff", input, output, subclass);
+
+    assertThat(matcher.matches(application), is(false));
+  }
+
+  @Test
+  public void classEqualToDoesNotMatchUnrelatedClass() {
+    PTransformMatcher matcher = 
PTransformMatchers.classEqualTo(ParDo.Bound.class);
+    PCollection<Integer> input = p.apply(Create.of(1));
+    Window.Bound<Integer> window = Window.into(new GlobalWindows());
+    PCollection<Integer> output = input.apply(window);
+
+    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of("DoStuff", 
input, output, window);
+
+    assertThat(matcher.matches(application), is(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/31833dc6/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
new file mode 100644
index 0000000..30dca6d
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.beam.sdk.runners;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Matches applications of {@link PTransform PTransforms}.
+ */
+@Experimental(Kind.CORE_RUNNERS_ONLY)
+public interface PTransformMatcher {
+  boolean matches(AppliedPTransform<?, ?, ?> application);
+}

Reply via email to