This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7fa4ebad057bca765b661a553f4ccc2df7861a05 Author: dylanhz <dyla...@163.com> AuthorDate: Fri Aug 16 11:37:31 2024 +0800 [FLINK-35931][table] Add the built-in function REGEXP_EXTRACT_ALL --- docs/data/sql_functions.yml | 11 +- docs/data/sql_functions_zh.yml | 10 ++ .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 18 ++++ .../pyflink/table/tests/test_expression.py | 10 +- .../flink/table/api/internal/BaseExpressions.java | 28 ++++++ .../functions/BuiltInFunctionDefinitions.java | 23 +++++ .../planner/functions/RegexpFunctionsITCase.java | 111 ++++++++++++++++++++- .../table/runtime/functions/SqlFunctionUtils.java | 2 +- .../functions/scalar/RegexpExtractAllFunction.java | 75 ++++++++++++++ 10 files changed, 282 insertions(+), 7 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 9d6d8b2acef..70823fc582c 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -358,7 +358,16 @@ string: not exceed the number of the defined groups. E.g. REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)" returns "bar". - + - sql: REGEXP_EXTRACT_ALL(str, regex[, extractIndex]) + table: str.regexpExtractAll(regex[, extractIndex]) + description: | + Extracts all the substrings in str that match the regex expression and correspond to the regex group extractIndex. + + regex may contain multiple groups. extractIndex indicates which regex group to extract and starts from 1, also the default value if not specified. 0 means matching the entire regular expression. + + `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | SMALLINT | INTEGER | BIGINT>` + + Returns an `ARRAY<STRING>` representation of all the matched substrings. `NULL` if any of the arguments are `NULL` or invalid. - sql: INITCAP(string) table: STRING.initCap() description: Returns a new form of STRING with the first character of each word converted to uppercase and the rest characters to lowercase. Here a word means a sequences of alphanumeric characters. diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index e26c3b44267..f2629ca1096 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -425,6 +425,16 @@ string: 将字符串 STRING1 按照 STRING2 正则表达式的规则拆分,返回指定 INTEGER1 处位置的字符串。正则表达式匹配组索引从 1 开始, 0 表示匹配整个正则表达式。此外,正则表达式匹配组索引不应超过定义的组数。 例如 `REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)` 返回 `"bar"`。 + - sql: REGEXP_EXTRACT_ALL(str, regex[, extractIndex]) + table: str.regexpExtractAll(regex[, extractIndex]) + description: | + 提取字符串 str 中与正则表达式 regex 匹配且与 extractIndex 组对应的所有子串。 + + regex 可以包含多个组。extractIndex 用于指示要提取哪个正则组,索引从 1 开始,也作为未指定时的默认值。0 表示匹配整个正则表达式。 + + `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | SMALLINT | INTEGER | BIGINT>` + + 返回一个 `ARRAY<STRING>`,表示所有匹配的子串。如果任何参数为 `NULL`或非法,则返回 `NULL`。 - sql: INITCAP(string) table: STRING.initCap() description: | diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 050feb15b30..60a5631e854 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -180,6 +180,7 @@ string functions Expression.regexp Expression.regexp_replace Expression.regexp_extract + Expression.regexp_extract_all Expression.from_base64 Expression.to_base64 Expression.ascii diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index d69bcd60277..1e924c508e1 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1225,6 +1225,24 @@ class Expression(Generic[T]): else: return _ternary_op("regexpExtract")(self, regex, extract_index) + def regexp_extract_all(self, regex, extract_index=None) -> 'Expression': + """ + Extracts all the substrings in str that match the regex expression and correspond to the + regex group extract_index. + regex may contain multiple groups. extract_index indicates which regex group to extract and + starts from 1, also the default value if not specified. 0 means matching the entire + regular expression. + null if any of the arguments are null or invalid. + + :param regex: A STRING expression with a matching pattern. + :param extract_index: An optional INTEGER expression with default 1. + :return: An ARRAY<STRING> of all the matched substrings. + """ + if extract_index is None: + return _binary_op("regexpExtractAll")(self, regex) + else: + return _ternary_op("regexpExtractAll")(self, regex, extract_index) + @property def from_base64(self) -> 'Expression[str]': """ diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index 0449a9a14a8..3af9c27732d 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -146,9 +146,6 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual('lpad(a, 4, b)', str(expr1.lpad(4, expr2))) self.assertEqual('rpad(a, 4, b)', str(expr1.rpad(4, expr2))) self.assertEqual('overlay(a, b, 6, 2)', str(expr1.overlay(expr2, 6, 2))) - self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2))) - self.assertEqual("regexpReplace(a, b, 'abc')", str(expr1.regexp_replace(expr2, 'abc'))) - self.assertEqual('regexpExtract(a, b, 3)', str(expr1.regexp_extract(expr2, 3))) self.assertEqual('fromBase64(a)', str(expr1.from_base64)) self.assertEqual('toBase64(a)', str(expr1.to_base64)) self.assertEqual('ascii(a)', str(expr1.ascii)) @@ -176,6 +173,13 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual('ELT(3, a, b, c)', str(lit(3).elt(expr1, expr2, expr3))) self.assertEqual("PRINTF('%d %s', a, b)", str(lit("%d %s").printf(expr1, expr2))) + # regexp functions + self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2))) + self.assertEqual('regexpExtract(a, b, 3)', str(expr1.regexp_extract(expr2, 3))) + self.assertEqual('REGEXP_EXTRACT_ALL(a, b)', str(expr1.regexp_extract_all(expr2))) + self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', str(expr1.regexp_extract_all(expr2, 3))) + self.assertEqual("regexpReplace(a, b, 'abc')", str(expr1.regexp_replace(expr2, 'abc'))) + # temporal functions self.assertEqual('cast(a, DATE)', str(expr1.to_date)) self.assertEqual('cast(a, TIME(0))', str(expr1.to_time)) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 39c84ad3923..7ea2848e348 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -161,6 +161,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.PROCTI import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RADIANS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE; @@ -1166,6 +1167,33 @@ public abstract class BaseExpressions<InType, OutType> { unresolvedCall(REGEXP_EXTRACT, toExpr(), objectToExpression(regex))); } + /** + * Extracts all the substrings in {@code str} that match the {@code regex} expression and + * correspond to the regex group {@code extractIndex}. <br> + * {@code regex} may contain multiple groups. {@code extractIndex} indicates which regex group + * to extract and starts from 1, also the default value if not specified. 0 means matching the + * entire regular expression. + * + * @param regex A STRING expression with a matching pattern. + * @param extractIndex An optional INTEGER expression with default 1. + * @return An ARRAY<STRING> of all the matched substrings. <br> + * null if any of the arguments are null or invalid. + */ + public OutType regexpExtractAll(InType regex, InType extractIndex) { + return toApiSpecificExpression( + unresolvedCall( + REGEXP_EXTRACT_ALL, + toExpr(), + objectToExpression(regex), + objectToExpression(extractIndex))); + } + + /** Extracts all the strings in str that match the regex expression. */ + public OutType regexpExtractAll(InType regex) { + return toApiSpecificExpression( + unresolvedCall(REGEXP_EXTRACT_ALL, toExpr(), objectToExpression(regex))); + } + /** * Returns a string by quotes a string as a JSON value and wrapping it with double quote * characters. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index ad6f90ea5e7..a005aa938fb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1127,6 +1127,28 @@ public final class BuiltInFunctionDefinitions { .outputTypeStrategy(explicit(DataTypes.STRING().nullable())) .build(); + public static final BuiltInFunctionDefinition REGEXP_EXTRACT_ALL = + BuiltInFunctionDefinition.newBuilder() + .name("REGEXP_EXTRACT_ALL") + .kind(SCALAR) + .inputTypeStrategy( + or( + sequence( + Arrays.asList("str", "regex"), + Arrays.asList( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING))), + sequence( + Arrays.asList("str", "regex", "extractIndex"), + Arrays.asList( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.INTEGER_NUMERIC))))) + .outputTypeStrategy(explicit(DataTypes.ARRAY(DataTypes.STRING()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.RegexpExtractAllFunction") + .build(); + public static final BuiltInFunctionDefinition JSON_QUOTE = BuiltInFunctionDefinition.newBuilder() .name("JSON_QUOTE") @@ -1145,6 +1167,7 @@ public final class BuiltInFunctionDefinitions { .runtimeClass( "org.apache.flink.table.runtime.functions.scalar.JsonUnquoteFunction") .build(); + public static final BuiltInFunctionDefinition FROM_BASE64 = BuiltInFunctionDefinition.newBuilder() .name("fromBase64") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java index ab66e6e4030..3c1efb5c0c2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java @@ -26,12 +26,12 @@ import java.util.stream.Stream; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; -/** Test regex functions correct behaviour. */ +/** Test Regexp functions correct behaviour. */ class RegexpFunctionsITCase extends BuiltInFunctionTestBase { @Override Stream<TestSetSpec> getTestSetSpecs() { - return Stream.of(regexpExtractTestCases()).flatMap(s -> s); + return Stream.of(regexpExtractTestCases(), regexpExtractAllTestCases()).flatMap(s -> s); } private Stream<TestSetSpec> regexpExtractTestCases() { @@ -50,4 +50,111 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase { "ABC", DataTypes.STRING().nullable())); } + + private Stream<TestSetSpec> regexpExtractAllTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL) + .onFieldsWithData(null, "abcdeabde", "100-200, 300-400") + .andDataTypes(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()) + // null input + .testResult( + $("f0").regexpExtractAll($("f1")), + "REGEXP_EXTRACT_ALL(f0, f1)", + null, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f1").regexpExtractAll($("f0")), + "REGEXP_EXTRACT_ALL(f1, f0)", + null, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f1").regexpExtractAll($("f1"), null), + "REGEXP_EXTRACT_ALL(f1, f1, NULL)", + null, + DataTypes.ARRAY(DataTypes.STRING())) + // invalid regexp + .testResult( + $("f1").regexpExtractAll("("), + "REGEXP_EXTRACT_ALL(f1, '(')", + null, + DataTypes.ARRAY(DataTypes.STRING())) + // invalid extractIndex + .testResult( + $("f1").regexpExtractAll("(abcdeabde)", -1), + "REGEXP_EXTRACT_ALL(f1, '(abcdeabde)', -1)", + null, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f1").regexpExtractAll("abcdeabde"), + "REGEXP_EXTRACT_ALL(f1, 'abcdeabde')", + null, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f1").regexpExtractAll("(abcdeabde)", 2), + "REGEXP_EXTRACT_ALL(f1, '(abcdeabde)', 2)", + null, + DataTypes.ARRAY(DataTypes.STRING())) + // not found + .testResult( + $("f2").regexpExtractAll("[a-z]", 0), + "REGEXP_EXTRACT_ALL(f2, '[a-z]', 0)", + new String[] {}, + DataTypes.ARRAY(DataTypes.STRING())) + // optional rule + .testResult( + $("f1").regexpExtractAll("(abcdeabde)|([a-z]*)", 2), + "REGEXP_EXTRACT_ALL(f1, '(abcdeabde)|([a-z]*)', 2)", + new String[] {null, ""}, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f1").regexpExtractAll("ab((c)|(.?))de", 2), + "REGEXP_EXTRACT_ALL(f1, 'ab((c)|(.?))de', 2)", + new String[] {"c", null}, + DataTypes.ARRAY(DataTypes.STRING())) + // normal cases + .testResult( + $("f1").regexpExtractAll("(ab)([a-z]+)(e)", 2), + "REGEXP_EXTRACT_ALL(f1, '(ab)([a-z]+)(e)', 2)", + new String[] {"cdeabd"}, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f1").regexpExtractAll("", 0), + "REGEXP_EXTRACT_ALL(f1, '', 0)", + new String[] {"", "", "", "", "", "", "", "", "", ""}, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f2").regexpExtractAll("(\\d+)-(\\d+)", 0), + "REGEXP_EXTRACT_ALL(f2, '(\\d+)-(\\d+)', 0)", + new String[] {"100-200", "300-400"}, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f2").regexpExtractAll("(\\d+)-(\\d+)", 1), + "REGEXP_EXTRACT_ALL(f2, '(\\d+)-(\\d+)', 1)", + new String[] {"100", "300"}, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f2").regexpExtractAll("(\\d+)-(\\d+)", 2), + "REGEXP_EXTRACT_ALL(f2, '(\\d+)-(\\d+)', 2)", + new String[] {"200", "400"}, + DataTypes.ARRAY(DataTypes.STRING())) + .testResult( + $("f2").regexpExtractAll("(\\d+).*", 1), + "REGEXP_EXTRACT_ALL(f2, '(\\d+).*', 1)", + new String[] {"100"}, + DataTypes.ARRAY(DataTypes.STRING())), + TestSetSpec.forFunction( + BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL, "Validation Error") + .onFieldsWithData(1024) + .andDataTypes(DataTypes.INT()) + .testTableApiValidationError( + $("f0").regexpExtractAll("1024"), + "Invalid input arguments. Expected signatures are:\n" + + "REGEXP_EXTRACT_ALL(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)\n" + + "REGEXP_EXTRACT_ALL(str <CHARACTER_STRING>, regex <CHARACTER_STRING>, extractIndex <INTEGER_NUMERIC>)") + .testSqlValidationError( + "REGEXP_EXTRACT_ALL(f0, '1024')", + "Invalid input arguments. Expected signatures are:\n" + + "REGEXP_EXTRACT_ALL(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)\n" + + "REGEXP_EXTRACT_ALL(str <CHARACTER_STRING>, regex <CHARACTER_STRING>, extractIndex <INTEGER_NUMERIC>)")); + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java index 2d6a3124e63..d51dd9f1cb9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java @@ -64,7 +64,7 @@ public class SqlFunctionUtils { private static final Logger LOG = LoggerFactory.getLogger(SqlFunctionUtils.class); - private static final ThreadLocalCache<String, Pattern> REGEXP_PATTERN_CACHE = + public static final ThreadLocalCache<String, Pattern> REGEXP_PATTERN_CACHE = new ThreadLocalCache<String, Pattern>() { @Override public Pattern getNewInstance(String regex) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpExtractAllFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpExtractAllFunction.java new file mode 100644 index 00000000000..df4ef721d1a --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpExtractAllFunction.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.PatternSyntaxException; + +import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE; + +/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT_ALL}. */ +@Internal +public class RegexpExtractAllFunction extends BuiltInScalarFunction { + + public RegexpExtractAllFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL, context); + } + + public @Nullable ArrayData eval(@Nullable StringData str, @Nullable StringData regex) { + return eval(str, regex, 1); + } + + public @Nullable ArrayData eval( + @Nullable StringData str, @Nullable StringData regex, @Nullable Number extractIndex) { + if (str == null || regex == null || extractIndex == null) { + return null; + } + + Matcher matcher; + try { + matcher = REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString()); + } catch (PatternSyntaxException e) { + return null; + } + + long groupIndex = extractIndex.longValue(); + if (groupIndex < 0 || matcher.groupCount() < groupIndex) { + return null; + } + + List<StringData> list = new ArrayList<>(); + while (matcher.find()) { + list.add(BinaryStringData.fromString(matcher.group((int) groupIndex))); + } + + return new GenericArrayData(list.toArray()); + } +}