Added Regex Transform and test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc28799d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc28799d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc28799d Branch: refs/heads/apex-runner Commit: bc28799d575341d4ab359c971a10514f518249a0 Parents: 70255d2 Author: Jesse Anderson <je...@smokinghand.com> Authored: Mon Jun 20 16:44:43 2016 -0700 Committer: Jesse Anderson <je...@smokinghand.com> Committed: Mon Nov 7 09:39:29 2016 -0800 ---------------------------------------------------------------------- .../beam/sdk/transforms/RegexTransform.java | 505 +++++++++++++++++++ .../beam/sdk/transforms/RegexTransformTest.java | 262 ++++++++++ 2 files changed, 767 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc28799d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java new file mode 100644 index 0000000..bd7848a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RegexTransform.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@code PTransorm}s to use Regular Expressions to process elements in a + * {@link PCollection}. + * + * <p> + * {@link RegexTransform#matches(String, int)} can be used to see if an entire line matches + * a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if an entire + * line matches a Regex and output certain groups as a {@link KV}. + * </p> + * <p> + * {@link RegexTransform#find(String, int)} can be used to see if a portion of a line + * matches a Regex. {@link RegexTransform#matchesKV(String, int, int)} can be used to see if a + * portion of a line matches a Regex and output certain groups as a {@link KV}. + * </p> + * <p> + * Lines that do not match the Regex will not be output. + * </p> + */ +public class RegexTransform { + private RegexTransform() { + // do not instantiate + } + + /** + * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the entire line (group 0) as a + * {@link PCollection}. + * @param regex + * The regular expression to run + */ + public static Matches matches(String regex) { + return matches(regex, 0); + } + + /** + * Returns a {@link RegexTransform.Matches} {@link PTransform} that checks if + * the entire line matches the Regex. Returns the group as a + * {@link PCollection}. + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Matches matches(String regex, int group) { + return new Matches(regex, group); + } + + /** + * Returns a {@link RegexTransform.MatchesKV} {@link PTransform} that checks + * if the entire line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static MatchesKV matchesKV(String regex, int keyGroup, + int valueGroup) { + return new MatchesKV(regex, keyGroup, valueGroup); + } + + /** + * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the entire line (group 0) as + * a {@link PCollection}. + * @param regex + * The regular expression to run + */ + public static Find find(String regex) { + return find(regex, 0); + } + + /** + * Returns a {@link RegexTransform.Find} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the group as a + * {@link PCollection}. + * @param regex + * The regular expression to run + * @param group + * The Regex group to return as a PCollection + */ + public static Find find(String regex, int group) { + return new Find(regex, group); + } + + /** + * Returns a {@link RegexTransform.FindKV} {@link PTransform} that checks if a + * portion of the line matches the Regex. Returns the specified groups as the + * key and value as a {@link PCollection}. + * @param regex + * The regular expression to run + * @param keyGroup + * The Regex group to use as the key + * @param valueGroup + * The Regex group to use the value + */ + public static FindKV findKV(String regex, int keyGroup, int valueGroup) { + return new FindKV(regex, keyGroup, valueGroup); + } + + /** + * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a + * portion of the line matches the Regex and replaces all matches with the replacement + * String. Returns the group as a {@link PCollection}. + * @param regex + * The regular expression to run + * @param replacement + * The string to be substituted for each match + */ + public static ReplaceAll replaceAll(String regex, String replacement) { + return new ReplaceAll(regex, replacement); + } + + /** + * Returns a {@link RegexTransform.ReplaceAll} {@link PTransform} that checks if a + * portion of the line matches the Regex and replaces the first match with the replacement + * String. Returns the group as a {@link PCollection}. + * @param regex + * The regular expression to run + * @param replacement + * The string to be substituted for each match + */ + public static ReplaceFirst replaceFirst(String regex, String replacement) { + return new ReplaceFirst(regex, replacement); + } + + /** + * Returns a {@link RegexTransform.Split} {@link PTransform} that splits a string + * on the regular expression and then outputs each item. It will not output empty + * items. Returns the group as a {@link PCollection}. + * a {@link PCollection}. + * @param regex + * The regular expression to run + */ + public static Split split(String regex) { + return split(regex, false); + } + + /** + * Returns a {@link RegexTransform.Split} {@link PTransform} that splits a string + * on the regular expression and then outputs each item. Returns the group as a + * {@link PCollection}. + * @param regex + * The regular expression to run + * @param outputEmpty + * Should empty be output. True to output empties and false if not. + */ + public static Split split(String regex, boolean outputEmpty) { + return new Split(regex, outputEmpty); + } + + /** + * {@code RegexTransform.Matches<String>} takes a {@code PCollection<String>} + * and returns a {@code PCollection<String>} representing the value + * extracted from the Regex groups of the input {@code PCollection} + * to the number of times that element occurs in the input. + * + * <p> + * This transform runs a Regex on the entire input line. If the entire line + * does not match the Regex, the line will not be output. If it does match the + * entire line, the group in the Regex will be used. The output will be the + * Regex group. + * + * <p> + * Example of use: + * <pre> + * {@code + * PCollection<String> words = ...; + * PCollection<String> values = + * words.apply(RegexTransform.matches("myregex (mygroup)", 1)); + * } + * </pre> + */ + public static class Matches + extends PTransform<PCollection<String>, PCollection<String>> { + Pattern pattern; + int group; + + public Matches(String regex, int group) { + this.pattern = Pattern.compile(regex); + this.group = group; + } + + public PCollection<String> apply(PCollection<String> in) { + return in + .apply(ParDo.of(new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.matches()) { + c.output(m.group(group)); + } + } + })); + } + } + + /** + * {@code RegexTransform.MatchesKV<KV<String, String>>} takes a + * {@code PCollection<String>} and returns a + * {@code PCollection<KV<String, String>>} representing the key and value + * extracted from the Regex groups of the input {@code PCollection} to the + * number of times that element occurs in the input. + * + * <p> + * This transform runs a Regex on the entire input line. If the entire line + * does not match the Regex, the line will not be output. If it does match the + * entire line, the groups in the Regex will be used. The key will be the + * key's group and the value will be the value's group. + * + * <p> + * Example of use: + * <pre> + * {@code + * PCollection<String> words = ...; + * PCollection<KV<String, String>> keysAndValues = + * words.apply(RegexTransform.matchesKV("myregex (mykeygroup) (myvaluegroup)", 1, 2)); + * } + * </pre> + */ + public static class MatchesKV + extends PTransform<PCollection<String>, PCollection<KV<String, String>>> { + Pattern pattern; + int keyGroup, valueGroup; + + public MatchesKV(String regex, int keyGroup, int valueGroup) { + this.pattern = Pattern.compile(regex); + this.keyGroup = keyGroup; + this.valueGroup = valueGroup; + } + + public PCollection<KV<String, String>> apply(PCollection<String> in) { + return in.apply(ParDo + .of(new DoFn<String, KV<String, String>>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); + } + } + })); + } + } + + /** + * {@code RegexTransform.Find<String>} takes a {@code PCollection<String>} and + * returns a {@code PCollection<String>} representing the value extracted + * from the Regex groups of the input {@code PCollection} to + * the number of times that element occurs in the input. + * + * <p> + * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will not be output. If it does + * match a portion of the line, the group in the Regex will be used. The + * output will be the Regex group. + * + * <p> + * Example of use: + * <pre> + * {@code + * PCollection<String> words = ...; + * PCollection<String> values = + * words.apply(RegexTransform.find("myregex (mygroup)", 1)); + * } + * </pre> + */ + public static class Find + extends PTransform<PCollection<String>, PCollection<String>> { + Pattern pattern; + int group; + + public Find(String regex, int group) { + this.pattern = Pattern.compile(regex); + this.group = group; + } + + public PCollection<String> apply(PCollection<String> in) { + return in.apply(ParDo.of(new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(m.group(group)); + } + } + })); + } + } + + /** + * {@code RegexTransform.MatchesKV<KV<String, String>>} takes a + * {@code PCollection<String>} and returns a + * {@code PCollection<KV<String, String>>} representing the key and value + * extracted from the Regex groups of the input {@code PCollection} to the + * number of times that element occurs in the input. + * + * <p> + * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will not be output. If it does + * match a portion of the line, the groups in the Regex will be used. The key + * will be the key's group and the value will be the value's group. + * + * <p> + * Example of use: + * <pre> + * {@code + * PCollection<String> words = ...; + * PCollection<KV<String, String>> keysAndValues = + * words.apply(RegexTransform.findKV("myregex (mykeygroup) (myvaluegroup)", 1, 2)); + * } + * </pre> + */ + public static class FindKV + extends PTransform<PCollection<String>, PCollection<KV<String, String>>> { + Pattern pattern; + int keyGroup, valueGroup; + + public FindKV(String regex, int keyGroup, int valueGroup) { + this.pattern = Pattern.compile(regex); + this.keyGroup = keyGroup; + this.valueGroup = valueGroup; + } + + public PCollection<KV<String, String>> apply(PCollection<String> in) { + return in.apply( + ParDo.of(new DoFn<String, KV<String, String>>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + + if (m.find()) { + c.output(KV.of(m.group(keyGroup), m.group(valueGroup))); + } + } + })); + } + } + + /** + * {@code RegexTransform.ReplaceAll<String>} takes a {@code PCollection<String>} and + * returns a {@code PCollection<String>} with all Strings that matched the + * Regex being replaced with the replacement string. + * + * <p> + * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will be output without changes. If it does + * match a portion of the line, all portions matching the Regex will be replaced + * with the replacement String. + * + * <p> + * Example of use: + * <pre> + * {@code + * PCollection<String> words = ...; + * PCollection<String> values = + * words.apply(RegexTransform.replaceAll("myregex", "myreplacement")); + * } + * </pre> + */ + public static class ReplaceAll + extends PTransform<PCollection<String>, PCollection<String>> { + Pattern pattern; + String replacement; + + public ReplaceAll(String regex, String replacement) { + this.pattern = Pattern.compile(regex); + this.replacement = replacement; + } + + public PCollection<String> apply(PCollection<String> in) { + return in.apply(ParDo.of(new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + c.output(m.replaceAll(replacement)); + } + })); + } + } + + /** + * {@code RegexTransform.ReplaceFirst<String>} takes a {@code PCollection<String>} and + * returns a {@code PCollection<String>} with the first Strings that matched the + * Regex being replaced with the replacement string. + * + * <p> + * This transform runs a Regex on the entire input line. If a portion of the + * line does not match the Regex, the line will be output without changes. If it does + * match a portion of the line, the first portion matching the Regex will be replaced + * with the replacement String. + * + * <p> + * Example of use: + * <pre> + * {@code + * PCollection<String> words = ...; + * PCollection<String> values = + * words.apply(RegexTransform.replaceFirst("myregex", "myreplacement")); + * } + * </pre> + */ + public static class ReplaceFirst + extends PTransform<PCollection<String>, PCollection<String>> { + Pattern pattern; + String replacement; + + public ReplaceFirst(String regex, String replacement) { + this.pattern = Pattern.compile(regex); + this.replacement = replacement; + } + + public PCollection<String> apply(PCollection<String> in) { + return in.apply(ParDo.of(new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Matcher m = pattern.matcher((String) c.element()); + c.output(m.replaceFirst(replacement)); + } + })); + } + } + + /** + * {@code RegexTransform.Split<String>} takes a {@code PCollection<String>} and + * returns a {@code PCollection<String>} with the input string split into + * individual items in a list. Each item is then output as a separate string. + * + * <p> + * This transform runs a Regex as part of a splint the entire input line. The split + * gives back an array of items. Each item is output as a separate item in the + * {@code PCollection<String>}. + * </p> + * + * <p> + * Depending on the Regex, a split can be an empty or + * "" string. You can pass in a parameter if you want empty strings or not. + * </p> + * + * <p> + * Example of use: + * <pre> + * {@code + * PCollection<String> words = ...; + * PCollection<String> values = + * words.apply(RegexTransform.split("\W*")); + * } + * </pre> + */ + public static class Split + extends PTransform<PCollection<String>, PCollection<String>> { + Pattern pattern; + boolean outputEmpty; + + public Split(String regex, boolean outputEmpty) { + this.pattern = Pattern.compile(regex); + this.outputEmpty = outputEmpty; + } + + public PCollection<String> apply(PCollection<String> in) { + return in.apply(ParDo.of(new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + String[] items = pattern.split(c.element()); + + for (String item : items) { + if (outputEmpty || !item.isEmpty()) { + c.output(item); + } + } + } + })); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc28799d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java new file mode 100644 index 0000000..63d02d7 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTransformTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; + +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link RegexTransform}. + */ +@RunWith(JUnit4.class) +public class RegexTransformTest implements Serializable { + @Test + @Category(NeedsRunner.class) + public void testFind() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("aj", "xj", "yj", "zj")) + .apply(RegexTransform.find("[xyz]")); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testFindGroup() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("aj", "xj", "yj", "zj")) + .apply(RegexTransform.find("([xyz])", 1)); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testFindNone() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("a", "b", "c", "d")) + .apply(RegexTransform.find("[xyz]")); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testKVFind() { + TestPipeline p = TestPipeline.create(); + + PCollection<KV<String, String>> output = p + .apply(Create.of("a b c")) + .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); + + PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testKVFindNone() { + TestPipeline p = TestPipeline.create(); + + PCollection<KV<String, String>> output = p + .apply(Create.of("x y z")) + .apply(RegexTransform.findKV("a (b) (c)", 1, 2)); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMatches() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("a", "x", "y", "z")) + .apply(RegexTransform.matches("[xyz]")); + + PAssert.that(output).containsInAnyOrder("x", "y", "z"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMatchesNone() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("a", "b", "c", "d")) + .apply(RegexTransform.matches("[xyz]")); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMatchesGroup() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("a", "x xxx", "x yyy", "x zzz")) + .apply(RegexTransform.matches("x ([xyz]*)", 1)); + + PAssert.that(output).containsInAnyOrder("xxx", "yyy", "zzz"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testKVMatches() { + TestPipeline p = TestPipeline.create(); + + PCollection<KV<String, String>> output = p + .apply(Create.of("a b c")) + .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); + + PAssert.that(output).containsInAnyOrder(KV.of("b", "c")); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testKVMatchesNone() { + TestPipeline p = TestPipeline.create(); + + PCollection<KV<String, String>> output = p + .apply(Create.of("x y z")) + .apply(RegexTransform.matchesKV("a (b) (c)", 1, 2)); + PAssert.that(output).empty(); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReplaceAll() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("xj", "yj", "zj")) + .apply(RegexTransform.replaceAll("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("newj", "newj", "newj"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReplaceAllMixed() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("abc", "xj", "yj", "zj", "def")) + .apply(RegexTransform.replaceAll("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("abc", "newj", "newj", "newj", "def"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReplaceFirst() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("xjx", "yjy", "zjz")) + .apply(RegexTransform.replaceFirst("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("newjx", "newjy", "newjz"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReplaceFirstMixed() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("abc", "xjx", "yjy", "zjz", "def")) + .apply(RegexTransform.replaceFirst("[xyz]", "new")); + + PAssert.that(output).containsInAnyOrder("abc", "newjx", "newjy", "newjz", "def"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testSplits() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("The quick brown fox jumps over the lazy dog")) + .apply(RegexTransform.split("\\W+")); + + PAssert.that(output).containsInAnyOrder("The", "quick", "brown", + "fox", "jumps", "over", "the", "lazy", "dog"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testSplitsWithEmpty() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("The quick brown fox jumps over the lazy dog")) + .apply(RegexTransform.split("\\s", true)); + + String[] outputStr = "The quick brown fox jumps over the lazy dog".split("\\s"); + + PAssert.that(output).containsInAnyOrder("The", "", "quick", "brown", "", "", + "fox", "jumps", "over", "", "", "", "the", "lazy", "dog"); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testSplitsWithoutEmpty() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> output = p + .apply(Create.of("The quick brown fox jumps over the lazy dog")) + .apply(RegexTransform.split("\\s", false)); + + PAssert.that(output).containsInAnyOrder("The", "quick", "brown", + "fox", "jumps", "over", "the", "lazy", "dog"); + p.run(); + } +}