This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cbbb1cd178a5b53200ee793f77bac73a0d49ff14
Author: dylanhz <[email protected]>
AuthorDate: Fri Aug 16 14:02:40 2024 +0800

    [FLINK-35962][table] Add the built-in function REGEXP_INSTR
---
 docs/data/sql_functions.yml                        | 10 +++
 docs/data/sql_functions_zh.yml                     | 10 +++
 .../docs/reference/pyflink.table/expressions.rst   |  1 +
 flink-python/pyflink/table/expression.py           | 11 ++++
 .../pyflink/table/tests/test_expression.py         |  1 +
 .../flink/table/api/internal/BaseExpressions.java  | 14 ++++
 .../functions/BuiltInFunctionDefinitions.java      | 15 +++++
 .../planner/functions/RegexpFunctionsITCase.java   | 76 +++++++++++++++++++++-
 .../functions/scalar/RegexpInstrFunction.java      | 55 ++++++++++++++++
 9 files changed, 192 insertions(+), 1 deletion(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 29b0c27ebcd..e6f768ca4c4 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -366,6 +366,16 @@ string:
       `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | 
SMALLINT | INTEGER | BIGINT>`
       
       Returns an `ARRAY<STRING>` representation of all the matched substrings. 
`NULL` if any of the arguments are `NULL` or invalid.
+  - sql: REGEXP_INSTR(str, regex)
+    table: str.regexpInstr(regex)
+    description: |
+      Returns the position of the first substring in str that matches regex.
+      
+      Result indexes begin at 1, 0 if there is no match.
+      
+      `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
+      
+      Returns an `INTEGER` representation of the first matched substring 
index. `NULL` if any of the arguments are `NULL` or regex is invalid.
   - sql: TRANSLATE(expr, fromStr, toStr)
     table: expr.translate(fromStr, toStr)
     description: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 76607687f17..e50040608c8 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -433,6 +433,16 @@ string:
       `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | 
SMALLINT | INTEGER | BIGINT>`
       
       返回一个 `ARRAY<STRING>`,表示所有匹配的子串。如果任何参数为 `NULL`或非法,则返回 `NULL`。
+  - sql: REGEXP_INSTR(str, regex)
+    table: str.regexpInstr(regex)
+    description: |
+      返回 str 中第一个匹配 regex 的子字符串的索引。
+      
+      结果索引从 1 开始,如果匹配失败则返回 0。
+      
+      `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
+      
+      返回一个 `INTEGER` 表示 str 中第一个匹配 regex 的子字符串索引。如果任何参数为 `NULL` 或 regex 非法,则返回 
`NULL`。
   - sql: TRANSLATE(expr, fromStr, toStr)
     table: expr.translate(fromStr, toStr)
     description: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 8698e884e1a..23796fbaca3 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -182,6 +182,7 @@ string functions
     Expression.regexp_replace
     Expression.regexp_extract
     Expression.regexp_extract_all
+    Expression.regexp_instr
     Expression.from_base64
     Expression.to_base64
     Expression.ascii
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 5975a79502b..6eabd528e02 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1254,6 +1254,17 @@ class Expression(Generic[T]):
         else:
             return _ternary_op("regexpExtractAll")(self, regex, extract_index)
 
+    def regexp_instr(self, regex) -> 'Expression':
+        """
+        Returns the position of the first substring in str that matches regex.
+        Result indexes begin at 1, 0 if there is no match.
+        null if any of the arguments are null or regex is invalid.
+
+        :param regex: A STRING expression with a matching pattern.
+        :return: An INTEGER representation of the first matched substring 
index.
+        """
+        return _binary_op("regexpInstr")(self, regex)
+
     @property
     def from_base64(self) -> 'Expression[str]':
         """
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index fb94a02ec7d..3aa101b685e 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -180,6 +180,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual('REGEXP_EXTRACT_ALL(a, b)', 
str(expr1.regexp_extract_all(expr2)))
         self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', 
str(expr1.regexp_extract_all(expr2, 3)))
         self.assertEqual("regexpReplace(a, b, 'abc')", 
str(expr1.regexp_replace(expr2, 'abc')))
+        self.assertEqual("REGEXP_INSTR(a, b)", str(expr1.regexp_instr(expr2)))
 
         # temporal functions
         self.assertEqual('cast(a, DATE)', str(expr1.to_date))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 5092937e7df..8c2f9f1a073 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -163,6 +163,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_COUNT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_INSTR;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE;
@@ -1208,6 +1209,19 @@ public abstract class BaseExpressions<InType, OutType> {
                 unresolvedCall(REGEXP_EXTRACT_ALL, toExpr(), 
objectToExpression(regex)));
     }
 
+    /**
+     * Returns the position of the first substring in {@code str} that matches 
{@code regex}. <br>
+     * Result indexes begin at 1, 0 if there is no match. <br>
+     *
+     * @param regex A STRING expression with a matching pattern.
+     * @return An INTEGER representation of the first matched substring index. 
<br>
+     *     null if any of the arguments are null or {@code regex} is invalid.
+     */
+    public OutType regexpInstr(InType regex) {
+        return toApiSpecificExpression(
+                unresolvedCall(REGEXP_INSTR, toExpr(), 
objectToExpression(regex)));
+    }
+
     /**
      * Returns a string by quotes a string as a JSON value and wrapping it 
with double quote
      * characters.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e726051a4c4..19aabcc6134 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1164,6 +1164,21 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.RegexpExtractAllFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition REGEXP_INSTR =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("REGEXP_INSTR")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Arrays.asList("str", "regex"),
+                                    Arrays.asList(
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING))))
+                    .outputTypeStrategy(explicit(DataTypes.INT()))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.RegexpInstrFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition JSON_QUOTE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_QUOTE")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
index 3809f98637f..24ec1479a30 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
@@ -35,7 +35,8 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase {
         return Stream.of(
                         regexpCountTestCases(),
                         regexpExtractTestCases(),
-                        regexpExtractAllTestCases())
+                        regexpExtractAllTestCases(),
+                        regexpInstrTestCases())
                 .flatMap(s -> s);
     }
 
@@ -238,4 +239,77 @@ class RegexpFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         + "REGEXP_EXTRACT_ALL(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>)\n"
                                         + "REGEXP_EXTRACT_ALL(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>, extractIndex 
<INTEGER_NUMERIC>)"));
     }
+
+    private Stream<TestSetSpec> regexpInstrTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_INSTR)
+                        .onFieldsWithData(null, "abcdeabde", "100-200, 
300-400")
+                        .andDataTypes(DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
+                        // null input
+                        .testResult(
+                                $("f0").regexpInstr($("f1")),
+                                "REGEXP_INSTR(f0, f1)",
+                                null,
+                                DataTypes.INT())
+                        .testResult(
+                                $("f1").regexpInstr($("f0")),
+                                "REGEXP_INSTR(f1, f0)",
+                                null,
+                                DataTypes.INT())
+                        // invalid regexp
+                        .testResult(
+                                $("f1").regexpInstr("("),
+                                "REGEXP_INSTR(f1, '(')",
+                                null,
+                                DataTypes.INT())
+                        // not found
+                        .testResult(
+                                $("f2").regexpInstr("[a-z]"),
+                                "REGEXP_INSTR(f2, '[a-z]')",
+                                0,
+                                DataTypes.INT())
+                        // border chars
+                        .testResult(
+                                lit("Helloworld! Hello 
everyone!").regexpInstr("\\bHello\\b"),
+                                "REGEXP_INSTR('Helloworld! Hello everyone!', 
'\\bHello\\b')",
+                                13,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("Helloworld!  Hello 
everyone!").regexpInstr("\\bHello\\b"),
+                                "REGEXP_INSTR('Helloworld!  Hello everyone!', 
'\\bHello\\b')",
+                                14,
+                                DataTypes.INT())
+                        // normal cases
+                        .testResult(
+                                lit("hello world! Hello 
everyone!").regexpInstr("Hello"),
+                                "REGEXP_INSTR('hello world! Hello everyone!', 
'Hello')",
+                                14,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("a.b.c.d").regexpInstr("\\."),
+                                "REGEXP_INSTR('a.b.c.d', '\\.')",
+                                2,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("abc123xyz456").regexpInstr("\\d"),
+                                "REGEXP_INSTR('abc123xyz456', '\\d')",
+                                4,
+                                DataTypes.INT())
+                        .testResult(
+                                $("f2").regexpInstr("(\\d+)-(\\d+)"),
+                                "REGEXP_INSTR(f2, '(\\d+)-(\\d+)')",
+                                1,
+                                DataTypes.INT()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_INSTR, "Validation 
Error")
+                        .onFieldsWithData(1024)
+                        .andDataTypes(DataTypes.INT())
+                        .testTableApiValidationError(
+                                $("f0").regexpInstr("1024"),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "REGEXP_INSTR(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>)")
+                        .testSqlValidationError(
+                                "REGEXP_INSTR(f0, '1024')",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "REGEXP_INSTR(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>)"));
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpInstrFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpInstrFunction.java
new file mode 100644
index 00000000000..2a233c042b1
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpInstrFunction.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+import java.util.regex.Matcher;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_INSTR}. */
+@Internal
+public class RegexpInstrFunction extends BuiltInScalarFunction {
+
+    public RegexpInstrFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.REGEXP_INSTR, context);
+    }
+
+    public @Nullable Integer eval(@Nullable StringData str, @Nullable 
StringData regex) {
+        if (str == null || regex == null) {
+            return null;
+        }
+
+        Matcher matcher;
+        try {
+            matcher = 
REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString());
+        } catch (PatternSyntaxException e) {
+            return null;
+        }
+
+        return matcher.find() ? matcher.start() + 1 : 0;
+    }
+}

Reply via email to