This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c14c763ca0f9c71d642953228b364dce3eb3bf4
Author: dylanhz <[email protected]>
AuthorDate: Tue Aug 6 10:33:01 2024 +0800

    [FLINK-35964][table] Add the built-in function STARTSWITH
---
 docs/data/sql_functions.yml                        |  12 ++
 docs/data/sql_functions_zh.yml                     |  12 ++
 .../docs/reference/pyflink.table/expressions.rst   |   1 +
 flink-python/pyflink/table/expression.py           |  10 ++
 .../pyflink/table/tests/test_expression.py         |   1 +
 .../flink/table/api/internal/BaseExpressions.java  |  14 ++
 .../functions/BuiltInFunctionDefinitions.java      |  21 +++
 .../planner/functions/StringFunctionsITCase.java   | 143 ++++++++++++++++++++-
 .../functions/scalar/StartsWithFunction.java       |  69 ++++++++++
 9 files changed, 282 insertions(+), 1 deletion(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 43c37f58281..355f8c532d2 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -329,6 +329,18 @@ string:
       STRING1.overlay(STRING2, INT1)
       STRING1.overlay(STRING2, INT1, INT2)
     description: Returns a string that replaces INT2 (STRING2's length by 
default) characters of STRING1 with STRING2 from position INT1. E.g., 
'xxxxxtest'.overlay('xxxx', 6) returns "xxxxxxxxx"; 'xxxxxtest'.overlay('xxxx', 
6, 2) returns "xxxxxxxxxst".
+  - sql: STARTSWITH(expr, startExpr)
+    table: expr.startsWith(startExpr)
+    description: |
+      Returns whether expr starts with startExpr. If startExpr is empty, the 
result is true.
+      
+      expr and startExpr should have same type. 
+      
+      `expr <CHAR | VARCHAR>, startExpr <CHAR | VARCHAR>`
+      
+      `expr <BINARY | VARBINARY>, startExpr <BINARY | VARBINARY>`
+      
+      Returns a `BOOLEAN`. `NULL` if any of the arguments are `NULL`.
   - sql: SUBSTRING(string FROM integer1 [ FOR integer2 ])
     table: |
       STRING.substring(INT1)
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index e604c2ca7c9..2f199613c56 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -398,6 +398,18 @@ string:
       将字符串 STRING1 从位置 INT1 开始替换 INT2(默认为 STRING2 的长度)个来自字符串 STRING2 的字符并返回。
       例如 `'xxxxxtest'.overlay('xxxx', 6)` 返回 `"xxxxxxxxx"`;
       `'xxxxxtest'.overlay('xxxx', 6, 2)` 返回 `"xxxxxxxxxst"`。
+  - sql: STARTSWITH(expr, startExpr)
+    table: expr.startsWith(startExpr)
+    description: |
+      判断 expr 是否以 startExpr 开头。如果 startExpr 为空,则结果为 true。
+      
+      expr 和 startExpr 应具有相同的类型。
+      
+      `expr <CHAR | VARCHAR>, startExpr <CHAR | VARCHAR>`
+      
+      `expr <BINARY | VARBINARY>, startExpr <BINARY | VARBINARY>`
+      
+      返回一个 `BOOLEAN`。如果任意参数为 `NULL`,则返回 `NULL`。
   - sql: SUBSTRING(string FROM integer1 [ FOR integer2 ])
     table: |
       STRING.substring(INT1)
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 3f494f18438..6f498eb1936 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -160,6 +160,7 @@ string functions
 .. autosummary::
     :toctree: api/
 
+    Expression.starts_with
     Expression.substring
     Expression.substr
     Expression.trim_leading
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 1e125de501c..474c60a3041 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1028,6 +1028,16 @@ class Expression(Generic[T]):
 
     # ---------------------------- string functions 
----------------------------------
 
+    def starts_with(self, start_expr) -> 'Expression':
+        """
+        Returns whether expr starts with start_expr. If start_expr is empty, 
the result is true.
+        expr and start_expr should have same type.
+
+        :param start_expr: A STRING or BINARY expression.
+        :return: A BOOLEAN.
+        """
+        return _binary_op("startsWith")(self, start_expr)
+
     def substring(self,
                   begin_index: Union[int, 'Expression[int]'],
                   length: Union[int, 'Expression[int]'] = None) -> 
'Expression[str]':
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 2337d33c62e..f4e60be1812 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -173,6 +173,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual("ELT(1, a)", str(lit(1).elt(expr1)))
         self.assertEqual('ELT(3, a, b, c)', str(lit(3).elt(expr1, expr2, 
expr3)))
         self.assertEqual("PRINTF('%d %s', a, b)", str(lit("%d 
%s").printf(expr1, expr2)))
+        self.assertEqual("STARTSWITH(a, b)", str(expr1.starts_with(expr2)))
 
         # regexp functions
         self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 87cd03d5fb7..4df1657a496 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -187,6 +187,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SINH;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT_INDEX;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SQRT;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STARTS_WITH;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_POP;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_SAMP;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STR_TO_MAP;
@@ -850,6 +851,19 @@ public abstract class BaseExpressions<InType, OutType> {
 
     // String operations
 
+    /**
+     * Returns whether {@code expr} starts with {@code startExpr}. If {@code 
startExpr} is empty,
+     * the result is true. <br>
+     * {@code expr} and {@code startExpr} should have same type.
+     *
+     * @param startExpr A STRING or BINARY expression.
+     * @return A BOOLEAN.
+     */
+    public OutType startsWith(InType startExpr) {
+        return toApiSpecificExpression(
+                unresolvedCall(STARTS_WITH, toExpr(), 
objectToExpression(startExpr)));
+    }
+
     /**
      * Creates a substring of the given string at given index for a given 
length.
      *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 6cf050900b1..6970e22e5a3 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -931,6 +931,27 @@ public final class BuiltInFunctionDefinitions {
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
                     .build();
 
+    public static final BuiltInFunctionDefinition STARTS_WITH =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("STARTSWITH")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            or(
+                                    sequence(
+                                            Arrays.asList("expr", "startExpr"),
+                                            Arrays.asList(
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING))),
+                                    sequence(
+                                            Arrays.asList("expr", "startExpr"),
+                                            Arrays.asList(
+                                                    
logical(LogicalTypeFamily.BINARY_STRING),
+                                                    
logical(LogicalTypeFamily.BINARY_STRING)))))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.StartsWithFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition SUBSTRING =
             BuiltInFunctionDefinition.newBuilder()
                     .name("substring")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
index 08a390b0897..0f6ec6e8360 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
@@ -38,7 +38,12 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase {
 
     @Override
     Stream<TestSetSpec> getTestSetSpecs() {
-        return Stream.of(bTrimTestCases(), eltTestCases(), printfTestCases(), 
translateTestCases())
+        return Stream.of(
+                        bTrimTestCases(),
+                        eltTestCases(),
+                        printfTestCases(),
+                        startsWithTestCases(),
+                        translateTestCases())
                 .flatMap(s -> s);
     }
 
@@ -310,6 +315,142 @@ class StringFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         + "PRINTF(format <CHARACTER_STRING>, 
obj <ANY>...)"));
     }
 
+    private Stream<TestSetSpec> startsWithTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.STARTS_WITH, "StringData")
+                        .onFieldsWithData(null, "www.apache.org", "", "in中文", 
"\uD83D\uDE00")
+                        .andDataTypes(
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING())
+                        // null input
+                        .testResult(
+                                $("f0").startsWith("abc"),
+                                "STARTSWITH(f0, 'abc')",
+                                null,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").startsWith($("f0")),
+                                "STARTSWITH(f1, f0)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        // empty input
+                        .testResult(
+                                $("f2").startsWith("abc"),
+                                "STARTSWITH(f2, 'abc')",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").startsWith($("f2")),
+                                "STARTSWITH(f1, f2)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                lit("").startsWith(""),
+                                "STARTSWITH('', '')",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN().notNull())
+                        // normal cases
+                        .testResult(
+                                $("f1").startsWith("ww"),
+                                "STARTSWITH(f1, 'ww')",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").startsWith("."),
+                                "STARTSWITH(f1, '.')",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f3").startsWith("in中"),
+                                "STARTSWITH(f3, 'in中')",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f3").startsWith("中"),
+                                "STARTSWITH(f3, '中')",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f4").startsWith($("f4")),
+                                "STARTSWITH(f4, f4)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f4").startsWith("\uD83D"),
+                                "STARTSWITH(f4, '\uD83D')",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.STARTS_WITH, "byte[]")
+                        .onFieldsWithData(
+                                null,
+                                new byte[] {1, 2, 3},
+                                new byte[0],
+                                new byte[0],
+                                new byte[] {1, 2},
+                                new byte[] {3})
+                        .andDataTypes(
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES().notNull(),
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES())
+                        // null input
+                        .testResult(
+                                $("f0").startsWith($("f1")),
+                                "STARTSWITH(f0, f1)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").startsWith($("f0")),
+                                "STARTSWITH(f1, f0)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        // empty input
+                        .testResult(
+                                $("f2").startsWith($("f1")),
+                                "STARTSWITH(f2, f1)",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").startsWith($("f2")),
+                                "STARTSWITH(f1, f2)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f3").startsWith($("f3")),
+                                "STARTSWITH(f3, f3)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN().notNull())
+                        // normal cases
+                        .testResult(
+                                $("f1").startsWith($("f4")),
+                                "STARTSWITH(f1, f4)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").startsWith($("f5")),
+                                "STARTSWITH(f1, f5)",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.STARTS_WITH, "Validation 
Error")
+                        .onFieldsWithData("12345", "123".getBytes())
+                        .andDataTypes(DataTypes.STRING(), DataTypes.BYTES())
+                        .testTableApiValidationError(
+                                $("f0").startsWith($("f1")),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "STARTSWITH(expr <CHARACTER_STRING>, 
startExpr <CHARACTER_STRING>)\n"
+                                        + "STARTSWITH(expr <BINARY_STRING>, 
startExpr <BINARY_STRING>)")
+                        .testSqlValidationError(
+                                "STARTSWITH(f0, f1)",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "STARTSWITH(expr <CHARACTER_STRING>, 
startExpr <CHARACTER_STRING>)\n"
+                                        + "STARTSWITH(expr <BINARY_STRING>, 
startExpr <BINARY_STRING>)"));
+    }
+
     private Stream<TestSetSpec> translateTestCases() {
         return Stream.of(
                 TestSetSpec.forFunction(BuiltInFunctionDefinitions.TRANSLATE)
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/StartsWithFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/StartsWithFunction.java
new file mode 100644
index 00000000000..93880de51ea
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/StartsWithFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#STARTS_WITH}. */
+@Internal
+public class StartsWithFunction extends BuiltInScalarFunction {
+
+    public StartsWithFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.STARTS_WITH, context);
+    }
+
+    public @Nullable Boolean eval(@Nullable StringData expr, @Nullable 
StringData startExpr) {
+        if (expr == null || startExpr == null) {
+            return null;
+        }
+        if (BinaryStringDataUtil.isEmpty((BinaryStringData) startExpr)) {
+            return true;
+        }
+        return ((BinaryStringData) expr).startsWith((BinaryStringData) 
startExpr);
+    }
+
+    public @Nullable Boolean eval(@Nullable byte[] expr, @Nullable byte[] 
startExpr) {
+        if (expr == null || startExpr == null) {
+            return null;
+        }
+        if (startExpr.length == 0) {
+            return true;
+        }
+        return matchAtStart(expr, startExpr);
+    }
+
+    private static boolean matchAtStart(byte[] source, byte[] target) {
+        if (source.length < target.length) {
+            return false;
+        }
+        for (int i = 0; i < target.length; i++) {
+            if (source[i] != target[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

Reply via email to