This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6e22b94e708 [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717) 6e22b94e708 is described below commit 6e22b94e708052ac450f84bce954a4d3ec7bb772 Author: Hanyu Zheng <135176127+hanyuzhe...@users.noreply.github.com> AuthorDate: Mon Jun 19 01:09:20 2023 -0700 [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717) --- docs/data/sql_functions.yml | 3 + docs/data/sql_functions_zh.yml | 3 + .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 9 +++ .../flink/table/api/internal/BaseExpressions.java | 35 ++++++++ .../functions/BuiltInFunctionDefinitions.java | 10 +++ .../table/types/inference/InputTypeStrategies.java | 8 ++ .../functions/CollectionFunctionsITCase.java | 94 +++++++++++++++++++++- .../functions/scalar/ArrayConcatFunction.java | 79 ++++++++++++++++++ 9 files changed, 241 insertions(+), 1 deletion(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 12b032351f9..87c8b0f9486 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -646,6 +646,9 @@ collection: - sql: ARRAY_UNION(array1, array2) table: haystack.arrayUnion(array) description: Returns an array of the elements in the union of array1 and array2, without duplicates. If any of the array is null, the function will return null. + - sql: ARRAY_CONCAT(array1, ...) + table: array1.arrayConcat(...) + description: Returns an array that is the result of concatenating at least one array. This array contains all the elements in the first array, followed by all the elements in the second array, and so forth, up to the Nth array. If any input array is NULL, the function returns NULL. - sql: MAP_KEYS(map) table: MAP.mapKeys() description: Returns the keys of the map as array. No order guaranteed. diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index efb16a2c633..f5249408a6a 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -724,6 +724,9 @@ collection: - sql: CARDINALITY(map) table: MAP.cardinality() description: 返回 map 中的 entries 数量。 + - sql: ARRAY_CONCAT(array1, ...) + table: array1.arrayConcat(...) + description: 返回一个数组,该数组是连接至少一个数组的结果。该数组包含第一个数组中的所有元素,然后是第二个数组中的所有元素,依此类推,直到第 N 个数组。如果任何输入数组为 NULL,则函数返回 NULL。 - sql: map ‘[’ value ‘]’ table: MAP.at(ANY) description: 返回 map 中指定 key 对应的值。 diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 86e93fefccf..0f9ac9bdc1d 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -225,6 +225,7 @@ advanced type helper functions Expression.at Expression.cardinality Expression.element + Expression.array_concat Expression.array_contains Expression.array_distinct Expression.array_position diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 52967d542c5..78984fd5d32 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1519,6 +1519,15 @@ class Expression(Generic[T]): """ return _binary_op("arrayUnion")(self, array) + def array_concat(self, *arrays) -> 'Expression': + """ + Returns an array that is the result of concatenating at least one array. + This array contains all the elements in the first array, followed by all + the elements in the second array, and so forth, up to the Nth array. + If any input array is NULL, the function returns NULL. + """ + return _binary_op("arrayConcat")(self, *arrays) + @property def map_keys(self) -> 'Expression': """ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index b308c049b19..5caefd52b2d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -53,6 +53,7 @@ import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONCAT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT; @@ -1408,6 +1409,40 @@ public abstract class BaseExpressions<InType, OutType> { unresolvedCall(ARRAY_UNION, toExpr(), objectToExpression(array))); } + /** + * Returns an array that is the result of concatenating at least one array. This array contains + * all the elements in the first array, followed by all the elements in the second array, and so + * forth, up to the Nth array. + * + * <p>If any input array is NULL, the function returns NULL. + */ + public OutType arrayConcat(InType... arrays) { + arrays = convertToArrays(arrays); + Expression[] args = + Stream.concat( + Stream.of(toExpr()), + Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression)) + .toArray(Expression[]::new); + return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args)); + } + + private InType[] convertToArrays(InType[] arrays) { + if (arrays == null || arrays.length == 0) { + return arrays; + } + InType notNullArray = null; + for (int i = 0; i < arrays.length; ++i) { + if (arrays[i] != null) { + notNullArray = arrays[i]; + } + } + if (!(notNullArray instanceof Object[])) { + return (InType[]) new Object[] {arrays}; + } else { + return arrays; + } + } + /** Returns the keys of the map as an array. */ public OutType mapKeys() { return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr())); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index ee528de6f7f..a694e2b56c9 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -70,6 +70,7 @@ import static org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_ import static org.apache.flink.table.types.inference.InputTypeStrategies.TYPE_LITERAL; import static org.apache.flink.table.types.inference.InputTypeStrategies.and; import static org.apache.flink.table.types.inference.InputTypeStrategies.commonArrayType; +import static org.apache.flink.table.types.inference.InputTypeStrategies.commonMultipleArrayType; import static org.apache.flink.table.types.inference.InputTypeStrategies.commonType; import static org.apache.flink.table.types.inference.InputTypeStrategies.comparable; import static org.apache.flink.table.types.inference.InputTypeStrategies.compositeSequence; @@ -285,6 +286,15 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.ArrayUnionFunction") .build(); + public static final BuiltInFunctionDefinition ARRAY_CONCAT = + BuiltInFunctionDefinition.newBuilder() + .name("ARRAY_CONCAT") + .kind(SCALAR) + .inputTypeStrategy(commonMultipleArrayType(1)) + .outputTypeStrategy(nullableIfArgs(COMMON)) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ArrayConcatFunction") + .build(); public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS = BuiltInFunctionDefinition.newBuilder() .name("$REPLICATE_ROWS$1") diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java index 87665027279..8fb44053096 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java @@ -356,6 +356,14 @@ public final class InputTypeStrategies { return new CommonArrayInputTypeStrategy(ConstantArgumentCount.of(count)); } + /** + * An {@link InputTypeStrategy} that expects {@code minCount} arguments that have a common array + * type. + */ + public static InputTypeStrategy commonMultipleArrayType(int minCount) { + return new CommonArrayInputTypeStrategy(ConstantArgumentCount.from(minCount)); + } + // -------------------------------------------------------------------------------------------- private InputTypeStrategies() { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java index 50bd6f6a4dd..689c13d38ac 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java @@ -43,7 +43,8 @@ class CollectionFunctionsITCase extends BuiltInFunctionTestBase { arrayPositionTestCases(), arrayRemoveTestCases(), arrayReverseTestCases(), - arrayUnionTestCases()) + arrayUnionTestCases(), + arrayConcatTestCases()) .flatMap(s -> s); } @@ -486,4 +487,95 @@ class CollectionFunctionsITCase extends BuiltInFunctionTestBase { "Invalid input arguments. Expected signatures are:\n" + "ARRAY_UNION(<COMMON>, <COMMON>)")); } + + private Stream<TestSetSpec> arrayConcatTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_CONCAT) + .onFieldsWithData( + new Integer[] {1, 2, null}, + null, + new Row[] { + Row.of(true, LocalDate.of(2022, 4, 20)), + Row.of(true, LocalDate.of(1990, 10, 14)), + null + }, + new Integer[] {1}, + 1, + new Integer[][] {{1}}, + new String[] {"123"}) + .andDataTypes( + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY( + DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), + DataTypes.ARRAY(DataTypes.INT().notNull()), + DataTypes.INT().notNull(), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())).notNull(), + DataTypes.ARRAY(DataTypes.STRING()).notNull()) + .testResult( + $("f0").arrayConcat(new Integer[] {1, null, 4}), + "ARRAY_CONCAT(f0, ARRAY[1, NULL, 4])", + new Integer[] {1, 2, null, 1, null, 4}, + DataTypes.ARRAY(DataTypes.INT())) + .testResult( + $("f0").arrayConcat(), + "ARRAY_CONCAT(f0)", + new Integer[] {1, 2, null}, + DataTypes.ARRAY(DataTypes.INT())) + .testTableApiValidationError( + $("f0").arrayConcat( + new Integer[] {null, null, null}, + new Integer[] {1, 2, 3}, + new Integer[] {3, 4, 5}), + "Invalid function call:\n" + "array(NULL, NULL, NULL)") + .testResult( + $("f1").arrayConcat( + new Integer[] {1, null, 4}, + new Integer[] {2, 3, 4}, + new Integer[] {2, 3, 4}), + "ARRAY_CONCAT(f1, ARRAY[1, NULL, 4], ARRAY[2, 3, 4], ARRAY[2, 3, 4])", + null, + DataTypes.ARRAY(DataTypes.INT())) + .testResult( + $("f2").arrayConcat( + new Row[] { + Row.of(true, LocalDate.of(1990, 10, 14)), + }, + new Row[] { + Row.of(true, LocalDate.of(1990, 10, 14)), + }), + "ARRAY_CONCAT(f2, ARRAY[(TRUE, DATE '1990-10-14')], ARRAY[(TRUE, DATE '1990-10-14')])", + new Row[] { + Row.of(true, LocalDate.of(2022, 4, 20)), + Row.of(true, LocalDate.of(1990, 10, 14)), + null, + Row.of(true, LocalDate.of(1990, 10, 14)), + Row.of(true, LocalDate.of(1990, 10, 14)) + }, + DataTypes.ARRAY( + DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE()))) + .testResult( + $("f3").arrayConcat(new Integer[] {2, null}), + "ARRAY_CONCAT(f3, ARRAY[2, NULL])", + new Integer[] {1, 2, null}, + DataTypes.ARRAY(DataTypes.INT())) + .testTableApiValidationError( + $("f0").arrayConcat( + new Integer[] {null, null, null}, + new Integer[] {1, 2, 3}, + new Integer[] {3, 4, 5}), + "Invalid function call:\n" + "array(NULL, NULL, NULL)") + .testTableApiValidationError( + $("f4").arrayConcat(true), + "Invalid input arguments. Expected signatures are:\n" + + "ARRAY_CONCAT(<COMMON>, <COMMON>...)") + .testTableApiValidationError( + $("f5").arrayConcat(new Integer[] {1}), + "Invalid function call:\n" + + "ARRAY_CONCAT(ARRAY<ARRAY<INT>> NOT NULL, ARRAY<INT NOT NULL> NOT NULL)") + .testTableApiValidationError( + $("f6").arrayConcat(new Integer[] {123}), + "Invalid function call:\n" + + "ARRAY_CONCAT(ARRAY<STRING> NOT NULL, ARRAY<INT NOT NULL> NOT NULL)")); + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java new file mode 100644 index 00000000000..d60c252d326 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_CONCAT}. */ +@Internal +public class ArrayConcatFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + + public ArrayConcatFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_CONCAT, context); + final DataType dataType = + // Since input arrays could be with different nullability we can not rely just on + // the first element. + ((CollectionDataType) context.getCallContext().getOutputDataType().get()) + .getElementDataType(); + elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); + } + + public @Nullable ArrayData eval(ArrayData... arrays) { + if (arrays == null || arrays.length == 0) { + return null; + } + if (arrays.length == 1) { + return arrays[0]; + } + try { + List<Object> list = new ArrayList<>(); + for (ArrayData array : arrays) { + if (array != null) { + appendElements(array, elementGetter, list); + } else { + return null; + } + } + return new GenericArrayData(list.toArray()); + } catch (Throwable t) { + throw new FlinkRuntimeException(t); + } + } + + void appendElements(ArrayData array, ArrayData.ElementGetter elementGetter, List<Object> list) + throws Throwable { + for (int i = 0; i < array.size(); ++i) { + final Object element = elementGetter.getElementOrNull(array, i); + list.add(element); + } + } +}