This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fa4ebad057bca765b661a553f4ccc2df7861a05
Author: dylanhz <dyla...@163.com>
AuthorDate: Fri Aug 16 11:37:31 2024 +0800

    [FLINK-35931][table] Add the built-in function REGEXP_EXTRACT_ALL
---
 docs/data/sql_functions.yml                        |  11 +-
 docs/data/sql_functions_zh.yml                     |  10 ++
 .../docs/reference/pyflink.table/expressions.rst   |   1 +
 flink-python/pyflink/table/expression.py           |  18 ++++
 .../pyflink/table/tests/test_expression.py         |  10 +-
 .../flink/table/api/internal/BaseExpressions.java  |  28 ++++++
 .../functions/BuiltInFunctionDefinitions.java      |  23 +++++
 .../planner/functions/RegexpFunctionsITCase.java   | 111 ++++++++++++++++++++-
 .../table/runtime/functions/SqlFunctionUtils.java  |   2 +-
 .../functions/scalar/RegexpExtractAllFunction.java |  75 ++++++++++++++
 10 files changed, 282 insertions(+), 7 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 9d6d8b2acef..70823fc582c 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -358,7 +358,16 @@ string:
       not exceed the number of the defined groups.
 
       E.g. REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)" returns "bar".
-
+  - sql: REGEXP_EXTRACT_ALL(str, regex[, extractIndex])
+    table: str.regexpExtractAll(regex[, extractIndex])
+    description: |
+      Extracts all the substrings in str that match the regex expression and 
correspond to the regex group extractIndex.
+      
+      regex may contain multiple groups. extractIndex indicates which regex 
group to extract and starts from 1, also the default value if not specified. 0 
means matching the entire regular expression.
+      
+      `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | 
SMALLINT | INTEGER | BIGINT>`
+      
+      Returns an `ARRAY<STRING>` representation of all the matched substrings. 
`NULL` if any of the arguments are `NULL` or invalid.
   - sql: INITCAP(string)
     table: STRING.initCap()
     description: Returns a new form of STRING with the first character of each 
word converted to uppercase and the rest characters to lowercase. Here a word 
means a sequences of alphanumeric characters.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index e26c3b44267..f2629ca1096 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -425,6 +425,16 @@ string:
       将字符串 STRING1 按照 STRING2 正则表达式的规则拆分,返回指定 INTEGER1 处位置的字符串。正则表达式匹配组索引从 1 
开始,
       0 表示匹配整个正则表达式。此外,正则表达式匹配组索引不应超过定义的组数。
       例如 `REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)` 返回 `"bar"`。
+  - sql: REGEXP_EXTRACT_ALL(str, regex[, extractIndex])
+    table: str.regexpExtractAll(regex[, extractIndex])
+    description: |
+      提取字符串 str 中与正则表达式 regex 匹配且与 extractIndex 组对应的所有子串。
+      
+      regex 可以包含多个组。extractIndex 用于指示要提取哪个正则组,索引从 1 开始,也作为未指定时的默认值。0 
表示匹配整个正则表达式。
+      
+      `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | 
SMALLINT | INTEGER | BIGINT>`
+      
+      返回一个 `ARRAY<STRING>`,表示所有匹配的子串。如果任何参数为 `NULL`或非法,则返回 `NULL`。
   - sql: INITCAP(string)
     table: STRING.initCap()
     description: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 050feb15b30..60a5631e854 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -180,6 +180,7 @@ string functions
     Expression.regexp
     Expression.regexp_replace
     Expression.regexp_extract
+    Expression.regexp_extract_all
     Expression.from_base64
     Expression.to_base64
     Expression.ascii
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index d69bcd60277..1e924c508e1 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1225,6 +1225,24 @@ class Expression(Generic[T]):
         else:
             return _ternary_op("regexpExtract")(self, regex, extract_index)
 
+    def regexp_extract_all(self, regex, extract_index=None) -> 'Expression':
+        """
+        Extracts all the substrings in str that match the regex expression and 
correspond to the
+        regex group extract_index.
+        regex may contain multiple groups. extract_index indicates which regex 
group to extract and
+        starts from 1, also the default value if not specified. 0 means 
matching the entire
+        regular expression.
+        null if any of the arguments are null or invalid.
+
+        :param regex: A STRING expression with a matching pattern.
+        :param extract_index: An optional INTEGER expression with default 1.
+        :return: An ARRAY<STRING> of all the matched substrings.
+        """
+        if extract_index is None:
+            return _binary_op("regexpExtractAll")(self, regex)
+        else:
+            return _ternary_op("regexpExtractAll")(self, regex, extract_index)
+
     @property
     def from_base64(self) -> 'Expression[str]':
         """
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 0449a9a14a8..3af9c27732d 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -146,9 +146,6 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual('lpad(a, 4, b)', str(expr1.lpad(4, expr2)))
         self.assertEqual('rpad(a, 4, b)', str(expr1.rpad(4, expr2)))
         self.assertEqual('overlay(a, b, 6, 2)', str(expr1.overlay(expr2, 6, 
2)))
-        self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
-        self.assertEqual("regexpReplace(a, b, 'abc')", 
str(expr1.regexp_replace(expr2, 'abc')))
-        self.assertEqual('regexpExtract(a, b, 3)', 
str(expr1.regexp_extract(expr2, 3)))
         self.assertEqual('fromBase64(a)', str(expr1.from_base64))
         self.assertEqual('toBase64(a)', str(expr1.to_base64))
         self.assertEqual('ascii(a)', str(expr1.ascii))
@@ -176,6 +173,13 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual('ELT(3, a, b, c)', str(lit(3).elt(expr1, expr2, 
expr3)))
         self.assertEqual("PRINTF('%d %s', a, b)", str(lit("%d 
%s").printf(expr1, expr2)))
 
+        # regexp functions
+        self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
+        self.assertEqual('regexpExtract(a, b, 3)', 
str(expr1.regexp_extract(expr2, 3)))
+        self.assertEqual('REGEXP_EXTRACT_ALL(a, b)', 
str(expr1.regexp_extract_all(expr2)))
+        self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', 
str(expr1.regexp_extract_all(expr2, 3)))
+        self.assertEqual("regexpReplace(a, b, 'abc')", 
str(expr1.regexp_replace(expr2, 'abc')))
+
         # temporal functions
         self.assertEqual('cast(a, DATE)', str(expr1.to_date))
         self.assertEqual('cast(a, TIME(0))', str(expr1.to_time))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 39c84ad3923..7ea2848e348 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -161,6 +161,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.PROCTI
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.RADIANS;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE;
@@ -1166,6 +1167,33 @@ public abstract class BaseExpressions<InType, OutType> {
                 unresolvedCall(REGEXP_EXTRACT, toExpr(), 
objectToExpression(regex)));
     }
 
+    /**
+     * Extracts all the substrings in {@code str} that match the {@code regex} 
expression and
+     * correspond to the regex group {@code extractIndex}. <br>
+     * {@code regex} may contain multiple groups. {@code extractIndex} 
indicates which regex group
+     * to extract and starts from 1, also the default value if not specified. 
0 means matching the
+     * entire regular expression.
+     *
+     * @param regex A STRING expression with a matching pattern.
+     * @param extractIndex An optional INTEGER expression with default 1.
+     * @return An ARRAY&lt;STRING&gt; of all the matched substrings. <br>
+     *     null if any of the arguments are null or invalid.
+     */
+    public OutType regexpExtractAll(InType regex, InType extractIndex) {
+        return toApiSpecificExpression(
+                unresolvedCall(
+                        REGEXP_EXTRACT_ALL,
+                        toExpr(),
+                        objectToExpression(regex),
+                        objectToExpression(extractIndex)));
+    }
+
+    /** Extracts all the strings in str that match the regex expression. */
+    public OutType regexpExtractAll(InType regex) {
+        return toApiSpecificExpression(
+                unresolvedCall(REGEXP_EXTRACT_ALL, toExpr(), 
objectToExpression(regex)));
+    }
+
     /**
      * Returns a string by quotes a string as a JSON value and wrapping it 
with double quote
      * characters.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index ad6f90ea5e7..a005aa938fb 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1127,6 +1127,28 @@ public final class BuiltInFunctionDefinitions {
                     
.outputTypeStrategy(explicit(DataTypes.STRING().nullable()))
                     .build();
 
+    public static final BuiltInFunctionDefinition REGEXP_EXTRACT_ALL =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("REGEXP_EXTRACT_ALL")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            or(
+                                    sequence(
+                                            Arrays.asList("str", "regex"),
+                                            Arrays.asList(
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING))),
+                                    sequence(
+                                            Arrays.asList("str", "regex", 
"extractIndex"),
+                                            Arrays.asList(
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                                    
logical(LogicalTypeFamily.INTEGER_NUMERIC)))))
+                    
.outputTypeStrategy(explicit(DataTypes.ARRAY(DataTypes.STRING())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.RegexpExtractAllFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition JSON_QUOTE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("JSON_QUOTE")
@@ -1145,6 +1167,7 @@ public final class BuiltInFunctionDefinitions {
                     .runtimeClass(
                             
"org.apache.flink.table.runtime.functions.scalar.JsonUnquoteFunction")
                     .build();
+
     public static final BuiltInFunctionDefinition FROM_BASE64 =
             BuiltInFunctionDefinition.newBuilder()
                     .name("fromBase64")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
index ab66e6e4030..3c1efb5c0c2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
@@ -26,12 +26,12 @@ import java.util.stream.Stream;
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
 
-/** Test regex functions correct behaviour. */
+/** Test Regexp functions correct behaviour. */
 class RegexpFunctionsITCase extends BuiltInFunctionTestBase {
 
     @Override
     Stream<TestSetSpec> getTestSetSpecs() {
-        return Stream.of(regexpExtractTestCases()).flatMap(s -> s);
+        return Stream.of(regexpExtractTestCases(), 
regexpExtractAllTestCases()).flatMap(s -> s);
     }
 
     private Stream<TestSetSpec> regexpExtractTestCases() {
@@ -50,4 +50,111 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase 
{
                                 "ABC",
                                 DataTypes.STRING().nullable()));
     }
+
+    private Stream<TestSetSpec> regexpExtractAllTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL)
+                        .onFieldsWithData(null, "abcdeabde", "100-200, 
300-400")
+                        .andDataTypes(DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
+                        // null input
+                        .testResult(
+                                $("f0").regexpExtractAll($("f1")),
+                                "REGEXP_EXTRACT_ALL(f0, f1)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f1").regexpExtractAll($("f0")),
+                                "REGEXP_EXTRACT_ALL(f1, f0)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f1").regexpExtractAll($("f1"), null),
+                                "REGEXP_EXTRACT_ALL(f1, f1, NULL)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        // invalid regexp
+                        .testResult(
+                                $("f1").regexpExtractAll("("),
+                                "REGEXP_EXTRACT_ALL(f1, '(')",
+                                null,
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        // invalid extractIndex
+                        .testResult(
+                                $("f1").regexpExtractAll("(abcdeabde)", -1),
+                                "REGEXP_EXTRACT_ALL(f1, '(abcdeabde)', -1)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f1").regexpExtractAll("abcdeabde"),
+                                "REGEXP_EXTRACT_ALL(f1, 'abcdeabde')",
+                                null,
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f1").regexpExtractAll("(abcdeabde)", 2),
+                                "REGEXP_EXTRACT_ALL(f1, '(abcdeabde)', 2)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        // not found
+                        .testResult(
+                                $("f2").regexpExtractAll("[a-z]", 0),
+                                "REGEXP_EXTRACT_ALL(f2, '[a-z]', 0)",
+                                new String[] {},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        // optional rule
+                        .testResult(
+                                
$("f1").regexpExtractAll("(abcdeabde)|([a-z]*)", 2),
+                                "REGEXP_EXTRACT_ALL(f1, 
'(abcdeabde)|([a-z]*)', 2)",
+                                new String[] {null, ""},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f1").regexpExtractAll("ab((c)|(.?))de", 2),
+                                "REGEXP_EXTRACT_ALL(f1, 'ab((c)|(.?))de', 2)",
+                                new String[] {"c", null},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        // normal cases
+                        .testResult(
+                                $("f1").regexpExtractAll("(ab)([a-z]+)(e)", 2),
+                                "REGEXP_EXTRACT_ALL(f1, '(ab)([a-z]+)(e)', 2)",
+                                new String[] {"cdeabd"},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f1").regexpExtractAll("", 0),
+                                "REGEXP_EXTRACT_ALL(f1, '', 0)",
+                                new String[] {"", "", "", "", "", "", "", "", 
"", ""},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f2").regexpExtractAll("(\\d+)-(\\d+)", 0),
+                                "REGEXP_EXTRACT_ALL(f2, '(\\d+)-(\\d+)', 0)",
+                                new String[] {"100-200", "300-400"},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f2").regexpExtractAll("(\\d+)-(\\d+)", 1),
+                                "REGEXP_EXTRACT_ALL(f2, '(\\d+)-(\\d+)', 1)",
+                                new String[] {"100", "300"},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f2").regexpExtractAll("(\\d+)-(\\d+)", 2),
+                                "REGEXP_EXTRACT_ALL(f2, '(\\d+)-(\\d+)', 2)",
+                                new String[] {"200", "400"},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f2").regexpExtractAll("(\\d+).*", 1),
+                                "REGEXP_EXTRACT_ALL(f2, '(\\d+).*', 1)",
+                                new String[] {"100"},
+                                DataTypes.ARRAY(DataTypes.STRING())),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL, 
"Validation Error")
+                        .onFieldsWithData(1024)
+                        .andDataTypes(DataTypes.INT())
+                        .testTableApiValidationError(
+                                $("f0").regexpExtractAll("1024"),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "REGEXP_EXTRACT_ALL(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>)\n"
+                                        + "REGEXP_EXTRACT_ALL(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>, extractIndex <INTEGER_NUMERIC>)")
+                        .testSqlValidationError(
+                                "REGEXP_EXTRACT_ALL(f0, '1024')",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "REGEXP_EXTRACT_ALL(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>)\n"
+                                        + "REGEXP_EXTRACT_ALL(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>, extractIndex 
<INTEGER_NUMERIC>)"));
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index 2d6a3124e63..d51dd9f1cb9 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -64,7 +64,7 @@ public class SqlFunctionUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SqlFunctionUtils.class);
 
-    private static final ThreadLocalCache<String, Pattern> 
REGEXP_PATTERN_CACHE =
+    public static final ThreadLocalCache<String, Pattern> REGEXP_PATTERN_CACHE 
=
             new ThreadLocalCache<String, Pattern>() {
                 @Override
                 public Pattern getNewInstance(String regex) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpExtractAllFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpExtractAllFunction.java
new file mode 100644
index 00000000000..df4ef721d1a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpExtractAllFunction.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT_ALL}. */
+@Internal
+public class RegexpExtractAllFunction extends BuiltInScalarFunction {
+
+    public RegexpExtractAllFunction(SpecializedFunction.SpecializedContext 
context) {
+        super(BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL, context);
+    }
+
+    public @Nullable ArrayData eval(@Nullable StringData str, @Nullable 
StringData regex) {
+        return eval(str, regex, 1);
+    }
+
+    public @Nullable ArrayData eval(
+            @Nullable StringData str, @Nullable StringData regex, @Nullable 
Number extractIndex) {
+        if (str == null || regex == null || extractIndex == null) {
+            return null;
+        }
+
+        Matcher matcher;
+        try {
+            matcher = 
REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString());
+        } catch (PatternSyntaxException e) {
+            return null;
+        }
+
+        long groupIndex = extractIndex.longValue();
+        if (groupIndex < 0 || matcher.groupCount() < groupIndex) {
+            return null;
+        }
+
+        List<StringData> list = new ArrayList<>();
+        while (matcher.find()) {
+            list.add(BinaryStringData.fromString(matcher.group((int) 
groupIndex)));
+        }
+
+        return new GenericArrayData(list.toArray());
+    }
+}

Reply via email to