This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 40936ba BEAM-7018: Added regex transform on Python SDK.
new 0a2ddc0 Merge pull request #8859 from mszb/BEAM-7018
40936ba is described below
commit 40936ba49df80e9518090dbf92346d11e7f89b9e
Author: Shoaib <[email protected]>
AuthorDate: Mon Aug 5 20:09:47 2019 +0500
BEAM-7018: Added regex transform on Python SDK.
---
sdks/python/apache_beam/transforms/util.py | 222 +++++++++++++++++++
sdks/python/apache_beam/transforms/util_test.py | 280 ++++++++++++++++++++++++
2 files changed, 502 insertions(+)
diff --git a/sdks/python/apache_beam/transforms/util.py
b/sdks/python/apache_beam/transforms/util.py
index edd2e72..c2866d6 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -24,9 +24,11 @@ from __future__ import division
import collections
import contextlib
import random
+import re
import time
import typing
import warnings
+from builtins import filter
from builtins import object
from builtins import range
from builtins import zip
@@ -69,6 +71,7 @@ __all__ = [
'Distinct',
'Keys',
'KvSwap',
+ 'Regex',
'Reify',
'RemoveDuplicates',
'Reshuffle',
@@ -865,3 +868,222 @@ class Reify(object):
def expand(self, pcoll):
return pcoll | ParDo(self.add_window_info)
+
+
+class Regex(object):
+ """
+ PTransform to use Regular Expression to process the elements in a
+ PCollection.
+ """
+
+ ALL = "__regex_all_groups"
+
+ @staticmethod
+ def _regex_compile(regex):
+ """Return re.compile if the regex has a string value"""
+ if isinstance(regex, str):
+ regex = re.compile(regex)
+ return regex
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(str)
+ @ptransform_fn
+ def matches(pcoll, regex, group=0):
+ """
+ Returns the matches (group 0 by default) if zero or more characters at the
+ beginning of string match the regular expression. To match the entire
+ string, add "$" sign at the end of regex expression.
+
+ Group can be integer value or a string value.
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ group: (optional) name/number of the group, it can be integer or a string
+ value. Defaults to 0, meaning the entire matched string will be
+ returned.
+ """
+ regex = Regex._regex_compile(regex)
+
+ def _process(element):
+ m = regex.match(element)
+ if m:
+ yield m.group(group)
+ return pcoll | FlatMap(_process)
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(typing.List[str])
+ @ptransform_fn
+ def all_matches(pcoll, regex):
+ """
+ Returns all matches (groups) if zero or more characters at the beginning
+ of string match the regular expression.
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ """
+ regex = Regex._regex_compile(regex)
+
+ def _process(element):
+ m = regex.match(element)
+ if m:
+ yield [m.group(ix) for ix in range(m.lastindex + 1)]
+
+ return pcoll | FlatMap(_process)
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(typing.Tuple[str, str])
+ @ptransform_fn
+ def matches_kv(pcoll, regex, keyGroup, valueGroup=0):
+ """
+ Returns the KV pairs if the string matches the regular expression, deriving
+ the key & value from the specified group of the regular expression.
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ keyGroup: The Regex group to use as the key. Can be int or str.
+ valueGroup: (optional) Regex group to use the value. Can be int or str.
+ The default value "0" returns entire matched string.
+ """
+ regex = Regex._regex_compile(regex)
+
+ def _process(element):
+ match = regex.match(element)
+ if match:
+ yield (match.group(keyGroup), match.group(valueGroup))
+ return pcoll | FlatMap(_process)
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(str)
+ @ptransform_fn
+ def find(pcoll, regex, group=0):
+ """
+ Returns the matches if a portion of the line matches the Regex. Returns
+ the entire group (group 0 by default). Group can be integer value or a
+ string value.
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ group: (optional) name of the group, it can be integer or a string value.
+ """
+ regex = Regex._regex_compile(regex)
+
+ def _process(element):
+ r = regex.search(element)
+ if r:
+ yield r.group(group)
+ return pcoll | FlatMap(_process)
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(typing.Union[typing.List[str],
+ typing.Tuple[str, str]])
+ @ptransform_fn
+ def find_all(pcoll, regex, group=0, outputEmpty=True):
+ """
+ Returns the matches if a portion of the line matches the Regex. By default,
+ list of group 0 will return with empty items. To get all groups, pass the
+ `Regex.ALL` flag in the `group` parameter which returns all the groups in
+ the tuple format.
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ group: (optional) name of the group, it can be integer or a string value.
+ outputEmpty: (optional) Should empty be output. True to output empties
+ and false if not.
+ """
+ regex = Regex._regex_compile(regex)
+
+ def _process(element):
+ matches = regex.finditer(element)
+ if group == Regex.ALL:
+ yield [(m.group(), m.groups()[0]) for m in matches if outputEmpty
+ or m.groups()[0]]
+ else:
+ yield [m.group(group) for m in matches if outputEmpty or
m.group(group)]
+ return pcoll | FlatMap(_process)
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(typing.Tuple[str, str])
+ @ptransform_fn
+ def find_kv(pcoll, regex, keyGroup, valueGroup=0):
+ """
+ Returns the matches if a portion of the line matches the Regex. Returns the
+ specified groups as the key and value pair.
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ keyGroup: The Regex group to use as the key. Can be int or str.
+ valueGroup: (optional) Regex group to use the value. Can be int or str.
+ The default value "0" returns entire matched string.
+ """
+ regex = Regex._regex_compile(regex)
+
+ def _process(element):
+ matches = regex.finditer(element)
+ if matches:
+ for match in matches:
+ yield (match.group(keyGroup), match.group(valueGroup))
+
+ return pcoll | FlatMap(_process)
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(str)
+ @ptransform_fn
+ def replace_all(pcoll, regex, replacement):
+ """
+ Returns the matches if a portion of the line matches the regex and
+ replaces all matches with the replacement string.
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ replacement: the string to be substituted for each match.
+ """
+ regex = Regex._regex_compile(regex)
+ return pcoll | Map(lambda elem: regex.sub(replacement, elem))
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(str)
+ @ptransform_fn
+ def replace_first(pcoll, regex, replacement):
+ """
+ Returns the matches if a portion of the line matches the regex and replaces
+ the first match with the replacement string.
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ replacement: the string to be substituted for each match.
+ """
+ regex = Regex._regex_compile(regex)
+ return pcoll | Map(lambda elem: regex.sub(replacement, elem, 1))
+
+ @staticmethod
+ @typehints.with_input_types(str)
+ @typehints.with_output_types(typing.List[str])
+ @ptransform_fn
+ def split(pcoll, regex, outputEmpty=False):
+ """
+ Returns the list string which was splitted on the basis of regular
+ expression. It will not output empty items (by defaults).
+
+ Args:
+ regex: the regular expression string or (re.compile) pattern.
+ outputEmpty: (optional) Should empty be output. True to output empties
+ and false if not.
+ """
+ regex = Regex._regex_compile(regex)
+ outputEmpty = bool(outputEmpty)
+
+ def _process(element):
+ r = regex.split(element)
+ if r and not outputEmpty:
+ r = list(filter(None, r))
+ yield r
+
+ return pcoll | FlatMap(_process)
diff --git a/sdks/python/apache_beam/transforms/util_test.py
b/sdks/python/apache_beam/transforms/util_test.py
index cf620f4..af2fc8c 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -24,6 +24,7 @@ import itertools
import logging
import math
import random
+import re
import time
import unittest
from builtins import object
@@ -618,6 +619,285 @@ class ReifyTest(unittest.TestCase):
assert_that(reified_pc, equal_to(expected), reify_windows=True)
+class RegexTest(unittest.TestCase):
+
+ def test_find(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["aj", "xj", "yj", "zj"])
+ | util.Regex.find("[xyz]"))
+ assert_that(result, equal_to(["x", "y", "z"]))
+
+ def test_find_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("[xyz]")
+ result = (p | beam.Create(["aj", "xj", "yj", "zj"]) |
util.Regex.find(rc))
+ assert_that(result, equal_to(["x", "y", "z"]))
+
+ def test_find_group(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["aj", "xj", "yj", "zj"])
+ | util.Regex.find("([xyz])j", group=1))
+ assert_that(result, equal_to(["x", "y", "z"]))
+
+ def test_find_empty(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a", "b", "c", "d"])
+ | util.Regex.find("[xyz]"))
+ assert_that(result, equal_to([]))
+
+ def test_find_group_name(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["aj", "xj", "yj", "zj"])
+ | util.Regex.find("(?P<namedgroup>[xyz])j",
group="namedgroup"))
+ assert_that(result, equal_to(["x", "y", "z"]))
+
+ def test_find_group_name_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("(?P<namedgroup>[xyz])j")
+ result = (p | beam.Create(["aj", "xj", "yj", "zj"]) | util.Regex.find(
+ rc, group="namedgroup"))
+ assert_that(result, equal_to(["x", "y", "z"]))
+
+ def test_find_all_groups(self):
+ data = ["abb ax abbb", "abc qwerty abcabcd xyz"]
+ with TestPipeline() as p:
+ pcol = (p | beam.Create(data))
+
+ assert_that(pcol | 'with default values' >> util.Regex.find_all('a(b*)'),
+ equal_to([['abb', 'a', 'abbb'], ['ab', 'ab', 'ab']]),
+ label='CheckWithDefaultValues')
+
+ assert_that(pcol | 'group 1' >> util.Regex.find_all('a(b*)', 1),
+ equal_to([['b', 'b', 'b'], ['bb', '', 'bbb']]),
+ label='CheckWithGroup1')
+
+ assert_that(pcol | 'group 1 non empty' >> util.Regex.find_all(
+ 'a(b*)', 1, outputEmpty=False),
+ equal_to([['b', 'b', 'b'], ['bb', 'bbb']]),
+ label='CheckGroup1NonEmpty')
+
+ assert_that(pcol | 'named group' >> util.Regex.find_all(
+ 'a(?P<namedgroup>b*)', 'namedgroup'),
+ equal_to([['b', 'b', 'b'], ['bb', '', 'bbb']]),
+ label='CheckNamedGroup')
+
+ assert_that(pcol | 'all groups' >> util.Regex.find_all(
+ 'a(?P<namedgroup>b*)', util.Regex.ALL),
+ equal_to([[('ab', 'b'), ('ab', 'b'), ('ab', 'b')],
+ [('abb', 'bb'), ('a', ''), ('abbb', 'bbb')]]),
+ label='CheckAllGroups')
+
+ assert_that(pcol | 'all non empty groups' >> util.Regex.find_all(
+ 'a(b*)', util.Regex.ALL, outputEmpty=False),
+ equal_to([[('ab', 'b'), ('ab', 'b'), ('ab', 'b')],
+ [('abb', 'bb'), ('abbb', 'bbb')]]),
+ label='CheckAllNonEmptyGroups')
+
+ def test_find_kv(self):
+ with TestPipeline() as p:
+ pcol = (p | beam.Create(['a b c d']))
+ assert_that(pcol | 'key 1' >> util.Regex.find_kv(
+ 'a (b) (c)', 1,), equal_to([('b', 'a b c')]), label='CheckKey1')
+
+ assert_that(pcol | 'key 1 group 1' >> util.Regex.find_kv(
+ 'a (b) (c)', 1, 2), equal_to([('b', 'c')]), label='CheckKey1Group1')
+
+ def test_find_kv_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("a (b) (c)")
+ result = (p | beam.Create(["a b c"]) | util.Regex.find_kv(rc, 1, 2))
+ assert_that(result, equal_to([("b", "c")]))
+
+ def test_find_kv_none(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["x y z"])
+ | util.Regex.find_kv("a (b) (c)", 1, 2))
+ assert_that(result, equal_to([]))
+
+ def test_match(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a", "x", "y", "z"])
+ | util.Regex.matches("[xyz]"))
+ assert_that(result, equal_to(["x", "y", "z"]))
+
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a", "ax", "yby", "zzc"])
+ | util.Regex.matches("[xyz]"))
+ assert_that(result, equal_to(["y", "z"]))
+
+ def test_match_entire_line(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a", "x", "y", "ay", "zz"])
+ | util.Regex.matches("[xyz]$"))
+ assert_that(result, equal_to(["x", "y"]))
+
+ def test_match_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("[xyz]")
+ result = (p | beam.Create(["a", "x", "y", "z"]) | util.Regex.matches(rc))
+ assert_that(result, equal_to(["x", "y", "z"]))
+
+ def test_match_none(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a", "b", "c", "d"])
+ | util.Regex.matches("[xyz]"))
+ assert_that(result, equal_to([]))
+
+ def test_match_group(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a", "x xxx", "x yyy", "x zzz"])
+ | util.Regex.matches("x ([xyz]*)", 1))
+ assert_that(result, equal_to(("xxx", "yyy", "zzz")))
+
+ def test_match_group_name(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a", "x xxx", "x yyy", "x zzz"])
+ | util.Regex.matches("x (?P<namedgroup>[xyz]*)", 'namedgroup'))
+ assert_that(result, equal_to(("xxx", "yyy", "zzz")))
+
+ def test_match_group_name_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("x (?P<namedgroup>[xyz]*)")
+ result = (p | beam.Create(["a", "x xxx", "x yyy", "x zzz"])
+ | util.Regex.matches(rc, 'namedgroup'))
+ assert_that(result, equal_to(("xxx", "yyy", "zzz")))
+
+ def test_match_group_empty(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a", "b", "c", "d"])
+ | util.Regex.matches("x (?P<namedgroup>[xyz]*)", 'namedgroup'))
+ assert_that(result, equal_to([]))
+
+ def test_all_matched(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a x", "x x", "y y", "z z"])
+ | util.Regex.all_matches("([xyz]) ([xyz])"))
+ expected_result = [["x x", "x", "x"], ["y y", "y", "y"],
+ ["z z", "z", "z"]]
+ assert_that(result, equal_to(expected_result))
+
+ def test_all_matched_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("([xyz]) ([xyz])")
+ result = (p | beam.Create(["a x", "x x", "y y", "z z"])
+ | util.Regex.all_matches(rc))
+ expected_result = [["x x", "x", "x"], ["y y", "y", "y"],
+ ["z z", "z", "z"]]
+ assert_that(result, equal_to(expected_result))
+
+ def test_match_group_kv(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a b c"])
+ | util.Regex.matches_kv("a (b) (c)", 1, 2))
+ assert_that(result, equal_to([("b", "c")]))
+
+ def test_match_group_kv_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("a (b) (c)")
+ pcol = (p | beam.Create(["a b c"]))
+ assert_that(pcol | 'key 1' >> util.Regex.matches_kv(
+ rc, 1), equal_to([("b", "a b c")]), label="CheckKey1")
+
+ assert_that(pcol | 'key 1 group 2' >> util.Regex.matches_kv(
+ rc, 1, 2), equal_to([("b", "c")]), label="CheckKey1Group2")
+
+ def test_match_group_kv_none(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["x y z"])
+ | util.Regex.matches_kv("a (b) (c)", 1, 2))
+ assert_that(result, equal_to([]))
+
+ def test_match_kv_group_names(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["a b c"]) | util.Regex.matches_kv(
+ "a (?P<keyname>b) (?P<valuename>c)", 'keyname', 'valuename'))
+ assert_that(result, equal_to([("b", "c")]))
+
+ def test_match_kv_group_names_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("a (?P<keyname>b) (?P<valuename>c)")
+ result = (p | beam.Create(["a b c"]) | util.Regex.matches_kv(
+ rc, 'keyname', 'valuename'))
+ assert_that(result, equal_to([("b", "c")]))
+
+ def test_match_kv_group_name_none(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["x y z"]) | util.Regex.matches_kv(
+ "a (?P<keyname>b) (?P<valuename>c)", 'keyname', 'valuename'))
+ assert_that(result, equal_to([]))
+
+ def test_replace_all(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["xj", "yj", "zj"]) | util.Regex.replace_all(
+ "[xyz]", "new"))
+ assert_that(result, equal_to(["newj", "newj", "newj"]))
+
+ def test_replace_all_mixed(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["abc", "xj", "yj", "zj", "def"])
+ | util.Regex.replace_all("[xyz]", 'new'))
+ assert_that(result, equal_to(["abc", "newj", "newj", "newj", "def"]))
+
+ def test_replace_all_mixed_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("[xyz]")
+ result = (p | beam.Create(
+ ["abc", "xj", "yj", "zj", "def"]) | util.Regex.replace_all(rc,
'new'))
+ assert_that(result, equal_to(["abc", "newj", "newj", "newj", "def"]))
+
+ def test_replace_first(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["xjx", "yjy", "zjz"])
+ | util.Regex.replace_first("[xyz]", 'new'))
+ assert_that(result, equal_to(["newjx", "newjy", "newjz"]))
+
+ def test_replace_first_mixed(self):
+ with TestPipeline() as p:
+ result = (p | beam.Create(["abc", "xjx", "yjy", "zjz", "def"])
+ | util.Regex.replace_first("[xyz]", 'new'))
+ assert_that(result, equal_to(["abc", "newjx", "newjy", "newjz", "def"]))
+
+ def test_replace_first_mixed_pattern(self):
+ with TestPipeline() as p:
+ rc = re.compile("[xyz]")
+ result = (p | beam.Create(["abc", "xjx", "yjy", "zjz", "def"])
+ | util.Regex.replace_first(rc, 'new'))
+ assert_that(result, equal_to(["abc", "newjx", "newjy", "newjz", "def"]))
+
+ def test_split(self):
+ with TestPipeline() as p:
+ data = ["The quick brown fox jumps over the lazy dog"]
+ result = (p | beam.Create(data) | util.Regex.split("\\W+"))
+ expected_result = [["The", "quick", "brown", "fox", "jumps", "over",
+ "the", "lazy", "dog"]]
+ assert_that(result, equal_to(expected_result))
+
+ def test_split_pattern(self):
+ with TestPipeline() as p:
+ data = ["The quick brown fox jumps over the lazy dog"]
+ rc = re.compile("\\W+")
+ result = (p | beam.Create(data) | util.Regex.split(rc))
+ expected_result = [["The", "quick", "brown", "fox", "jumps", "over",
+ "the", "lazy", "dog"]]
+ assert_that(result, equal_to(expected_result))
+
+ def test_split_with_empty(self):
+ with TestPipeline() as p:
+ data = ["The quick brown fox jumps over the lazy dog"]
+ result = (p | beam.Create(data) | util.Regex.split("\\s", True))
+ expected_result = [['The', '', 'quick', '', '', 'brown', 'fox', 'jumps',
+ 'over', '', '', '', 'the', 'lazy', 'dog']]
+ assert_that(result, equal_to(expected_result))
+
+ def test_split_without_empty(self):
+ with TestPipeline() as p:
+ data = ["The quick brown fox jumps over the lazy dog"]
+ result = (p | beam.Create(data) | util.Regex.split("\\s", False))
+ expected_result = [["The", "quick", "brown", "fox", "jumps", "over",
+ "the", "lazy", "dog"]]
+ assert_that(result, equal_to(expected_result))
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()