This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cbbb1cd178a5b53200ee793f77bac73a0d49ff14 Author: dylanhz <[email protected]> AuthorDate: Fri Aug 16 14:02:40 2024 +0800 [FLINK-35962][table] Add the built-in function REGEXP_INSTR --- docs/data/sql_functions.yml | 10 +++ docs/data/sql_functions_zh.yml | 10 +++ .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 11 ++++ .../pyflink/table/tests/test_expression.py | 1 + .../flink/table/api/internal/BaseExpressions.java | 14 ++++ .../functions/BuiltInFunctionDefinitions.java | 15 +++++ .../planner/functions/RegexpFunctionsITCase.java | 76 +++++++++++++++++++++- .../functions/scalar/RegexpInstrFunction.java | 55 ++++++++++++++++ 9 files changed, 192 insertions(+), 1 deletion(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 29b0c27ebcd..e6f768ca4c4 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -366,6 +366,16 @@ string: `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | SMALLINT | INTEGER | BIGINT>` Returns an `ARRAY<STRING>` representation of all the matched substrings. `NULL` if any of the arguments are `NULL` or invalid. + - sql: REGEXP_INSTR(str, regex) + table: str.regexpInstr(regex) + description: | + Returns the position of the first substring in str that matches regex. + + Result indexes begin at 1, 0 if there is no match. + + `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>` + + Returns an `INTEGER` representation of the first matched substring index. `NULL` if any of the arguments are `NULL` or regex is invalid. - sql: TRANSLATE(expr, fromStr, toStr) table: expr.translate(fromStr, toStr) description: | diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index 76607687f17..e50040608c8 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -433,6 +433,16 @@ string: `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | SMALLINT | INTEGER | BIGINT>` 返回一个 `ARRAY<STRING>`,表示所有匹配的子串。如果任何参数为 `NULL`或非法,则返回 `NULL`。 + - sql: REGEXP_INSTR(str, regex) + table: str.regexpInstr(regex) + description: | + 返回 str 中第一个匹配 regex 的子字符串的索引。 + + 结果索引从 1 开始,如果匹配失败则返回 0。 + + `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>` + + 返回一个 `INTEGER` 表示 str 中第一个匹配 regex 的子字符串索引。如果任何参数为 `NULL` 或 regex 非法,则返回 `NULL`。 - sql: TRANSLATE(expr, fromStr, toStr) table: expr.translate(fromStr, toStr) description: | diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 8698e884e1a..23796fbaca3 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -182,6 +182,7 @@ string functions Expression.regexp_replace Expression.regexp_extract Expression.regexp_extract_all + Expression.regexp_instr Expression.from_base64 Expression.to_base64 Expression.ascii diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 5975a79502b..6eabd528e02 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1254,6 +1254,17 @@ class Expression(Generic[T]): else: return _ternary_op("regexpExtractAll")(self, regex, extract_index) + def regexp_instr(self, regex) -> 'Expression': + """ + Returns the position of the first substring in str that matches regex. + Result indexes begin at 1, 0 if there is no match. + null if any of the arguments are null or regex is invalid. + + :param regex: A STRING expression with a matching pattern. + :return: An INTEGER representation of the first matched substring index. + """ + return _binary_op("regexpInstr")(self, regex) + @property def from_base64(self) -> 'Expression[str]': """ diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index fb94a02ec7d..3aa101b685e 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -180,6 +180,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual('REGEXP_EXTRACT_ALL(a, b)', str(expr1.regexp_extract_all(expr2))) self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', str(expr1.regexp_extract_all(expr2, 3))) self.assertEqual("regexpReplace(a, b, 'abc')", str(expr1.regexp_replace(expr2, 'abc'))) + self.assertEqual("REGEXP_INSTR(a, b)", str(expr1.regexp_instr(expr2))) # temporal functions self.assertEqual('cast(a, DATE)', str(expr1.to_date)) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 5092937e7df..8c2f9f1a073 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -163,6 +163,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_COUNT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_INSTR; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE; @@ -1208,6 +1209,19 @@ public abstract class BaseExpressions<InType, OutType> { unresolvedCall(REGEXP_EXTRACT_ALL, toExpr(), objectToExpression(regex))); } + /** + * Returns the position of the first substring in {@code str} that matches {@code regex}. <br> + * Result indexes begin at 1, 0 if there is no match. <br> + * + * @param regex A STRING expression with a matching pattern. + * @return An INTEGER representation of the first matched substring index. <br> + * null if any of the arguments are null or {@code regex} is invalid. + */ + public OutType regexpInstr(InType regex) { + return toApiSpecificExpression( + unresolvedCall(REGEXP_INSTR, toExpr(), objectToExpression(regex))); + } + /** * Returns a string by quotes a string as a JSON value and wrapping it with double quote * characters. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index e726051a4c4..19aabcc6134 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1164,6 +1164,21 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.RegexpExtractAllFunction") .build(); + public static final BuiltInFunctionDefinition REGEXP_INSTR = + BuiltInFunctionDefinition.newBuilder() + .name("REGEXP_INSTR") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + Arrays.asList("str", "regex"), + Arrays.asList( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)))) + .outputTypeStrategy(explicit(DataTypes.INT())) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.RegexpInstrFunction") + .build(); + public static final BuiltInFunctionDefinition JSON_QUOTE = BuiltInFunctionDefinition.newBuilder() .name("JSON_QUOTE") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java index 3809f98637f..24ec1479a30 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java @@ -35,7 +35,8 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase { return Stream.of( regexpCountTestCases(), regexpExtractTestCases(), - regexpExtractAllTestCases()) + regexpExtractAllTestCases(), + regexpInstrTestCases()) .flatMap(s -> s); } @@ -238,4 +239,77 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase { + "REGEXP_EXTRACT_ALL(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)\n" + "REGEXP_EXTRACT_ALL(str <CHARACTER_STRING>, regex <CHARACTER_STRING>, extractIndex <INTEGER_NUMERIC>)")); } + + private Stream<TestSetSpec> regexpInstrTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_INSTR) + .onFieldsWithData(null, "abcdeabde", "100-200, 300-400") + .andDataTypes(DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()) + // null input + .testResult( + $("f0").regexpInstr($("f1")), + "REGEXP_INSTR(f0, f1)", + null, + DataTypes.INT()) + .testResult( + $("f1").regexpInstr($("f0")), + "REGEXP_INSTR(f1, f0)", + null, + DataTypes.INT()) + // invalid regexp + .testResult( + $("f1").regexpInstr("("), + "REGEXP_INSTR(f1, '(')", + null, + DataTypes.INT()) + // not found + .testResult( + $("f2").regexpInstr("[a-z]"), + "REGEXP_INSTR(f2, '[a-z]')", + 0, + DataTypes.INT()) + // border chars + .testResult( + lit("Helloworld! Hello everyone!").regexpInstr("\\bHello\\b"), + "REGEXP_INSTR('Helloworld! Hello everyone!', '\\bHello\\b')", + 13, + DataTypes.INT()) + .testResult( + lit("Helloworld! Hello everyone!").regexpInstr("\\bHello\\b"), + "REGEXP_INSTR('Helloworld! Hello everyone!', '\\bHello\\b')", + 14, + DataTypes.INT()) + // normal cases + .testResult( + lit("hello world! Hello everyone!").regexpInstr("Hello"), + "REGEXP_INSTR('hello world! Hello everyone!', 'Hello')", + 14, + DataTypes.INT()) + .testResult( + lit("a.b.c.d").regexpInstr("\\."), + "REGEXP_INSTR('a.b.c.d', '\\.')", + 2, + DataTypes.INT()) + .testResult( + lit("abc123xyz456").regexpInstr("\\d"), + "REGEXP_INSTR('abc123xyz456', '\\d')", + 4, + DataTypes.INT()) + .testResult( + $("f2").regexpInstr("(\\d+)-(\\d+)"), + "REGEXP_INSTR(f2, '(\\d+)-(\\d+)')", + 1, + DataTypes.INT()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_INSTR, "Validation Error") + .onFieldsWithData(1024) + .andDataTypes(DataTypes.INT()) + .testTableApiValidationError( + $("f0").regexpInstr("1024"), + "Invalid input arguments. Expected signatures are:\n" + + "REGEXP_INSTR(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)") + .testSqlValidationError( + "REGEXP_INSTR(f0, '1024')", + "Invalid input arguments. Expected signatures are:\n" + + "REGEXP_INSTR(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)")); + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpInstrFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpInstrFunction.java new file mode 100644 index 00000000000..2a233c042b1 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpInstrFunction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; + +import javax.annotation.Nullable; + +import java.util.regex.Matcher; +import java.util.regex.PatternSyntaxException; + +import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE; + +/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_INSTR}. */ +@Internal +public class RegexpInstrFunction extends BuiltInScalarFunction { + + public RegexpInstrFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.REGEXP_INSTR, context); + } + + public @Nullable Integer eval(@Nullable StringData str, @Nullable StringData regex) { + if (str == null || regex == null) { + return null; + } + + Matcher matcher; + try { + matcher = REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString()); + } catch (PatternSyntaxException e) { + return null; + } + + return matcher.find() ? matcher.start() + 1 : 0; + } +}
