Repository: beam Updated Branches: refs/heads/master cf1925fba -> bdcd26c33
Add PTransformMatcher This interface is used to identify PTransforms to replace during Pipeline Surgery. Add ClassPTransformMatcher as the initial simple matcher. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31833dc6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31833dc6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31833dc6 Branch: refs/heads/master Commit: 31833dc6ff64bfdabc88abf4065760977f53abdd Parents: cf1925f Author: Thomas Groh <tg...@google.com> Authored: Tue Feb 7 09:35:25 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Wed Feb 8 10:28:43 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/core/PTransformMatchers.java | 59 +++++++++++ .../runners/core/PTransformMatchersTest.java | 101 +++++++++++++++++++ .../beam/sdk/runners/PTransformMatcher.java | 32 ++++++ 3 files changed, 192 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/31833dc6/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java new file mode 100644 index 0000000..362e8dc --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; + +/** + * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the + * transform. + * + * <p>Once {@link PTransform PTransforms} have URNs, this will be removed and replaced with a + * UrnPTransformMatcher. + */ +@Experimental(Kind.CORE_RUNNERS_ONLY) +public class PTransformMatchers { + private PTransformMatchers() {} + + /** + * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the class of the + * {@link PTransform} is equal to the {@link Class} provided ot this matcher. + * @param clazz + * @return + */ + public static PTransformMatcher classEqualTo(Class<? extends PTransform> clazz) { + return new EqualClassPTransformMatcher(clazz); + } + + private static class EqualClassPTransformMatcher implements PTransformMatcher { + private final Class<? extends PTransform> clazz; + + private EqualClassPTransformMatcher(Class<? extends PTransform> clazz) { + this.clazz = clazz; + } + + @Override + public boolean matches(AppliedPTransform<?, ?, ?> application) { + return application.getTransform().getClass().equals(clazz); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31833dc6/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java new file mode 100644 index 0000000..c286a37 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.runnerapi; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import java.io.Serializable; +import org.apache.beam.runners.core.PTransformMatchers; +import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link PTransformMatcher}. + */ +@RunWith(JUnit4.class) +public class PTransformMatchersTest implements Serializable { + @Rule + public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Test + public void classEqualToMatchesSameClass() { + PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class); + PCollection<Integer> input = p.apply(Create.of(1)); + ParDo.Bound<Integer, Integer> pardo = ParDo.of(new DoFn<Integer, Integer>() { + @ProcessElement + public void doStuff(ProcessContext ctxt) { + } + }); + PCollection<Integer> output = input.apply(pardo); + + AppliedPTransform<?, ?, ?> application = AppliedPTransform.of("DoStuff", input, output, pardo); + + assertThat(matcher.matches(application), is(true)); + } + + @Test + public void classEqualToMatchesSubClass() { + class MyPTransform extends PTransform<PCollection<Integer>, PCollection<Integer>> { + @Override + public PCollection<Integer> expand(PCollection<Integer> input) { + return input; + } + } + PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class); + PCollection<Integer> input = p.apply(Create.of(1)); + MyPTransform subclass = new MyPTransform() {}; + + assertThat(subclass.getClass(), not(Matchers.<Class<?>>equalTo(MyPTransform.class))); + assertThat(subclass, instanceOf(MyPTransform.class)); + PCollection<Integer> output = input.apply(subclass); + + AppliedPTransform<?, ?, ?> application = + AppliedPTransform.of("DoStuff", input, output, subclass); + + assertThat(matcher.matches(application), is(false)); + } + + @Test + public void classEqualToDoesNotMatchUnrelatedClass() { + PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class); + PCollection<Integer> input = p.apply(Create.of(1)); + Window.Bound<Integer> window = Window.into(new GlobalWindows()); + PCollection<Integer> output = input.apply(window); + + AppliedPTransform<?, ?, ?> application = AppliedPTransform.of("DoStuff", input, output, window); + + assertThat(matcher.matches(application), is(false)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/31833dc6/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java new file mode 100644 index 0000000..30dca6d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.beam.sdk.runners; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; + +/** + * Matches applications of {@link PTransform PTransforms}. + */ +@Experimental(Kind.CORE_RUNNERS_ONLY) +public interface PTransformMatcher { + boolean matches(AppliedPTransform<?, ?, ?> application); +}