This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4c14c763ca0f9c71d642953228b364dce3eb3bf4 Author: dylanhz <[email protected]> AuthorDate: Tue Aug 6 10:33:01 2024 +0800 [FLINK-35964][table] Add the built-in function STARTSWITH --- docs/data/sql_functions.yml | 12 ++ docs/data/sql_functions_zh.yml | 12 ++ .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 10 ++ .../pyflink/table/tests/test_expression.py | 1 + .../flink/table/api/internal/BaseExpressions.java | 14 ++ .../functions/BuiltInFunctionDefinitions.java | 21 +++ .../planner/functions/StringFunctionsITCase.java | 143 ++++++++++++++++++++- .../functions/scalar/StartsWithFunction.java | 69 ++++++++++ 9 files changed, 282 insertions(+), 1 deletion(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 43c37f58281..355f8c532d2 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -329,6 +329,18 @@ string: STRING1.overlay(STRING2, INT1) STRING1.overlay(STRING2, INT1, INT2) description: Returns a string that replaces INT2 (STRING2's length by default) characters of STRING1 with STRING2 from position INT1. E.g., 'xxxxxtest'.overlay('xxxx', 6) returns "xxxxxxxxx"; 'xxxxxtest'.overlay('xxxx', 6, 2) returns "xxxxxxxxxst". + - sql: STARTSWITH(expr, startExpr) + table: expr.startsWith(startExpr) + description: | + Returns whether expr starts with startExpr. If startExpr is empty, the result is true. + + expr and startExpr should have same type. + + `expr <CHAR | VARCHAR>, startExpr <CHAR | VARCHAR>` + + `expr <BINARY | VARBINARY>, startExpr <BINARY | VARBINARY>` + + Returns a `BOOLEAN`. `NULL` if any of the arguments are `NULL`. - sql: SUBSTRING(string FROM integer1 [ FOR integer2 ]) table: | STRING.substring(INT1) diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index e604c2ca7c9..2f199613c56 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -398,6 +398,18 @@ string: 将字符串 STRING1 从位置 INT1 开始替换 INT2(默认为 STRING2 的长度)个来自字符串 STRING2 的字符并返回。 例如 `'xxxxxtest'.overlay('xxxx', 6)` 返回 `"xxxxxxxxx"`; `'xxxxxtest'.overlay('xxxx', 6, 2)` 返回 `"xxxxxxxxxst"`。 + - sql: STARTSWITH(expr, startExpr) + table: expr.startsWith(startExpr) + description: | + 判断 expr 是否以 startExpr 开头。如果 startExpr 为空,则结果为 true。 + + expr 和 startExpr 应具有相同的类型。 + + `expr <CHAR | VARCHAR>, startExpr <CHAR | VARCHAR>` + + `expr <BINARY | VARBINARY>, startExpr <BINARY | VARBINARY>` + + 返回一个 `BOOLEAN`。如果任意参数为 `NULL`,则返回 `NULL`。 - sql: SUBSTRING(string FROM integer1 [ FOR integer2 ]) table: | STRING.substring(INT1) diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 3f494f18438..6f498eb1936 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -160,6 +160,7 @@ string functions .. autosummary:: :toctree: api/ + Expression.starts_with Expression.substring Expression.substr Expression.trim_leading diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 1e125de501c..474c60a3041 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1028,6 +1028,16 @@ class Expression(Generic[T]): # ---------------------------- string functions ---------------------------------- + def starts_with(self, start_expr) -> 'Expression': + """ + Returns whether expr starts with start_expr. If start_expr is empty, the result is true. + expr and start_expr should have same type. + + :param start_expr: A STRING or BINARY expression. + :return: A BOOLEAN. + """ + return _binary_op("startsWith")(self, start_expr) + def substring(self, begin_index: Union[int, 'Expression[int]'], length: Union[int, 'Expression[int]'] = None) -> 'Expression[str]': diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index 2337d33c62e..f4e60be1812 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -173,6 +173,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual("ELT(1, a)", str(lit(1).elt(expr1))) self.assertEqual('ELT(3, a, b, c)', str(lit(3).elt(expr1, expr2, expr3))) self.assertEqual("PRINTF('%d %s', a, b)", str(lit("%d %s").printf(expr1, expr2))) + self.assertEqual("STARTSWITH(a, b)", str(expr1.starts_with(expr2))) # regexp functions self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 87cd03d5fb7..4df1657a496 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -187,6 +187,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SINH; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT_INDEX; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SQRT; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STARTS_WITH; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_POP; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_SAMP; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.STR_TO_MAP; @@ -850,6 +851,19 @@ public abstract class BaseExpressions<InType, OutType> { // String operations + /** + * Returns whether {@code expr} starts with {@code startExpr}. If {@code startExpr} is empty, + * the result is true. <br> + * {@code expr} and {@code startExpr} should have same type. + * + * @param startExpr A STRING or BINARY expression. + * @return A BOOLEAN. + */ + public OutType startsWith(InType startExpr) { + return toApiSpecificExpression( + unresolvedCall(STARTS_WITH, toExpr(), objectToExpression(startExpr))); + } + /** * Creates a substring of the given string at given index for a given length. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 6cf050900b1..6970e22e5a3 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -931,6 +931,27 @@ public final class BuiltInFunctionDefinitions { .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN()))) .build(); + public static final BuiltInFunctionDefinition STARTS_WITH = + BuiltInFunctionDefinition.newBuilder() + .name("STARTSWITH") + .kind(SCALAR) + .inputTypeStrategy( + or( + sequence( + Arrays.asList("expr", "startExpr"), + Arrays.asList( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING))), + sequence( + Arrays.asList("expr", "startExpr"), + Arrays.asList( + logical(LogicalTypeFamily.BINARY_STRING), + logical(LogicalTypeFamily.BINARY_STRING))))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.StartsWithFunction") + .build(); + public static final BuiltInFunctionDefinition SUBSTRING = BuiltInFunctionDefinition.newBuilder() .name("substring") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java index 08a390b0897..0f6ec6e8360 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java @@ -38,7 +38,12 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase { @Override Stream<TestSetSpec> getTestSetSpecs() { - return Stream.of(bTrimTestCases(), eltTestCases(), printfTestCases(), translateTestCases()) + return Stream.of( + bTrimTestCases(), + eltTestCases(), + printfTestCases(), + startsWithTestCases(), + translateTestCases()) .flatMap(s -> s); } @@ -310,6 +315,142 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase { + "PRINTF(format <CHARACTER_STRING>, obj <ANY>...)")); } + private Stream<TestSetSpec> startsWithTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.STARTS_WITH, "StringData") + .onFieldsWithData(null, "www.apache.org", "", "in中文", "\uD83D\uDE00") + .andDataTypes( + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING()) + // null input + .testResult( + $("f0").startsWith("abc"), + "STARTSWITH(f0, 'abc')", + null, + DataTypes.BOOLEAN()) + .testResult( + $("f1").startsWith($("f0")), + "STARTSWITH(f1, f0)", + null, + DataTypes.BOOLEAN()) + // empty input + .testResult( + $("f2").startsWith("abc"), + "STARTSWITH(f2, 'abc')", + Boolean.FALSE, + DataTypes.BOOLEAN()) + .testResult( + $("f1").startsWith($("f2")), + "STARTSWITH(f1, f2)", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + lit("").startsWith(""), + "STARTSWITH('', '')", + Boolean.TRUE, + DataTypes.BOOLEAN().notNull()) + // normal cases + .testResult( + $("f1").startsWith("ww"), + "STARTSWITH(f1, 'ww')", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f1").startsWith("."), + "STARTSWITH(f1, '.')", + Boolean.FALSE, + DataTypes.BOOLEAN()) + .testResult( + $("f3").startsWith("in中"), + "STARTSWITH(f3, 'in中')", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f3").startsWith("中"), + "STARTSWITH(f3, '中')", + Boolean.FALSE, + DataTypes.BOOLEAN()) + .testResult( + $("f4").startsWith($("f4")), + "STARTSWITH(f4, f4)", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f4").startsWith("\uD83D"), + "STARTSWITH(f4, '\uD83D')", + Boolean.FALSE, + DataTypes.BOOLEAN()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.STARTS_WITH, "byte[]") + .onFieldsWithData( + null, + new byte[] {1, 2, 3}, + new byte[0], + new byte[0], + new byte[] {1, 2}, + new byte[] {3}) + .andDataTypes( + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES().notNull(), + DataTypes.BYTES(), + DataTypes.BYTES()) + // null input + .testResult( + $("f0").startsWith($("f1")), + "STARTSWITH(f0, f1)", + null, + DataTypes.BOOLEAN()) + .testResult( + $("f1").startsWith($("f0")), + "STARTSWITH(f1, f0)", + null, + DataTypes.BOOLEAN()) + // empty input + .testResult( + $("f2").startsWith($("f1")), + "STARTSWITH(f2, f1)", + Boolean.FALSE, + DataTypes.BOOLEAN()) + .testResult( + $("f1").startsWith($("f2")), + "STARTSWITH(f1, f2)", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f3").startsWith($("f3")), + "STARTSWITH(f3, f3)", + Boolean.TRUE, + DataTypes.BOOLEAN().notNull()) + // normal cases + .testResult( + $("f1").startsWith($("f4")), + "STARTSWITH(f1, f4)", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f1").startsWith($("f5")), + "STARTSWITH(f1, f5)", + Boolean.FALSE, + DataTypes.BOOLEAN()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.STARTS_WITH, "Validation Error") + .onFieldsWithData("12345", "123".getBytes()) + .andDataTypes(DataTypes.STRING(), DataTypes.BYTES()) + .testTableApiValidationError( + $("f0").startsWith($("f1")), + "Invalid input arguments. Expected signatures are:\n" + + "STARTSWITH(expr <CHARACTER_STRING>, startExpr <CHARACTER_STRING>)\n" + + "STARTSWITH(expr <BINARY_STRING>, startExpr <BINARY_STRING>)") + .testSqlValidationError( + "STARTSWITH(f0, f1)", + "Invalid input arguments. Expected signatures are:\n" + + "STARTSWITH(expr <CHARACTER_STRING>, startExpr <CHARACTER_STRING>)\n" + + "STARTSWITH(expr <BINARY_STRING>, startExpr <BINARY_STRING>)")); + } + private Stream<TestSetSpec> translateTestCases() { return Stream.of( TestSetSpec.forFunction(BuiltInFunctionDefinitions.TRANSLATE) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/StartsWithFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/StartsWithFunction.java new file mode 100644 index 00000000000..93880de51ea --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/StartsWithFunction.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#STARTS_WITH}. */ +@Internal +public class StartsWithFunction extends BuiltInScalarFunction { + + public StartsWithFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.STARTS_WITH, context); + } + + public @Nullable Boolean eval(@Nullable StringData expr, @Nullable StringData startExpr) { + if (expr == null || startExpr == null) { + return null; + } + if (BinaryStringDataUtil.isEmpty((BinaryStringData) startExpr)) { + return true; + } + return ((BinaryStringData) expr).startsWith((BinaryStringData) startExpr); + } + + public @Nullable Boolean eval(@Nullable byte[] expr, @Nullable byte[] startExpr) { + if (expr == null || startExpr == null) { + return null; + } + if (startExpr.length == 0) { + return true; + } + return matchAtStart(expr, startExpr); + } + + private static boolean matchAtStart(byte[] source, byte[] target) { + if (source.length < target.length) { + return false; + } + for (int i = 0; i < target.length; i++) { + if (source[i] != target[i]) { + return false; + } + } + return true; + } +}
