This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e22b94e708 [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717)
6e22b94e708 is described below

commit 6e22b94e708052ac450f84bce954a4d3ec7bb772
Author: Hanyu Zheng <135176127+hanyuzhe...@users.noreply.github.com>
AuthorDate: Mon Jun 19 01:09:20 2023 -0700

    [FLINK-31665] [table] Add ARRAY_CONCAT function (#22717)
---
 docs/data/sql_functions.yml                        |  3 +
 docs/data/sql_functions_zh.yml                     |  3 +
 .../docs/reference/pyflink.table/expressions.rst   |  1 +
 flink-python/pyflink/table/expression.py           |  9 +++
 .../flink/table/api/internal/BaseExpressions.java  | 35 ++++++++
 .../functions/BuiltInFunctionDefinitions.java      | 10 +++
 .../table/types/inference/InputTypeStrategies.java |  8 ++
 .../functions/CollectionFunctionsITCase.java       | 94 +++++++++++++++++++++-
 .../functions/scalar/ArrayConcatFunction.java      | 79 ++++++++++++++++++
 9 files changed, 241 insertions(+), 1 deletion(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 12b032351f9..87c8b0f9486 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -646,6 +646,9 @@ collection:
   - sql: ARRAY_UNION(array1, array2)
     table: haystack.arrayUnion(array)
     description: Returns an array of the elements in the union of array1 and 
array2, without duplicates. If any of the array is null, the function will 
return null.
+  - sql: ARRAY_CONCAT(array1, ...)
+    table: array1.arrayConcat(...)
+    description: Returns an array that is the result of concatenating at least 
one array. This array contains all the elements in the first array, followed by 
all the elements in the second array, and so forth, up to the Nth array. If any 
input array is NULL, the function returns NULL.
   - sql: MAP_KEYS(map)
     table: MAP.mapKeys()
     description: Returns the keys of the map as array. No order guaranteed.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index efb16a2c633..f5249408a6a 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -724,6 +724,9 @@ collection:
   - sql: CARDINALITY(map)
     table: MAP.cardinality()
     description: 返回 map 中的 entries 数量。
+  - sql: ARRAY_CONCAT(array1, ...)
+    table: array1.arrayConcat(...)
+    description: 
返回一个数组,该数组是连接至少一个数组的结果。该数组包含第一个数组中的所有元素,然后是第二个数组中的所有元素,依此类推,直到第 N 个数组。如果任何输入数组为 
NULL,则函数返回 NULL。
   - sql: map ‘[’ value ‘]’
     table: MAP.at(ANY)
     description: 返回 map 中指定 key 对应的值。
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 86e93fefccf..0f9ac9bdc1d 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -225,6 +225,7 @@ advanced type helper functions
     Expression.at
     Expression.cardinality
     Expression.element
+    Expression.array_concat
     Expression.array_contains
     Expression.array_distinct
     Expression.array_position
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 52967d542c5..78984fd5d32 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1519,6 +1519,15 @@ class Expression(Generic[T]):
         """
         return _binary_op("arrayUnion")(self, array)
 
+    def array_concat(self, *arrays) -> 'Expression':
+        """
+        Returns an array that is the result of concatenating at least one 
array.
+        This array contains all the elements in the first array, followed by 
all
+        the elements in the second array, and so forth, up to the Nth array.
+        If any input array is NULL, the function returns NULL.
+        """
+        return _binary_op("arrayConcat")(self, *arrays)
+
     @property
     def map_keys(self) -> 'Expression':
         """
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index b308c049b19..5caefd52b2d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -53,6 +53,7 @@ import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONCAT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT;
@@ -1408,6 +1409,40 @@ public abstract class BaseExpressions<InType, OutType> {
                 unresolvedCall(ARRAY_UNION, toExpr(), 
objectToExpression(array)));
     }
 
+    /**
+     * Returns an array that is the result of concatenating at least one 
array. This array contains
+     * all the elements in the first array, followed by all the elements in 
the second array, and so
+     * forth, up to the Nth array.
+     *
+     * <p>If any input array is NULL, the function returns NULL.
+     */
+    public OutType arrayConcat(InType... arrays) {
+        arrays = convertToArrays(arrays);
+        Expression[] args =
+                Stream.concat(
+                                Stream.of(toExpr()),
+                                
Arrays.stream(arrays).map(ApiExpressionUtils::objectToExpression))
+                        .toArray(Expression[]::new);
+        return toApiSpecificExpression(unresolvedCall(ARRAY_CONCAT, args));
+    }
+
+    private InType[] convertToArrays(InType[] arrays) {
+        if (arrays == null || arrays.length == 0) {
+            return arrays;
+        }
+        InType notNullArray = null;
+        for (int i = 0; i < arrays.length; ++i) {
+            if (arrays[i] != null) {
+                notNullArray = arrays[i];
+            }
+        }
+        if (!(notNullArray instanceof Object[])) {
+            return (InType[]) new Object[] {arrays};
+        } else {
+            return arrays;
+        }
+    }
+
     /** Returns the keys of the map as an array. */
     public OutType mapKeys() {
         return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr()));
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index ee528de6f7f..a694e2b56c9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -70,6 +70,7 @@ import static 
org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.TYPE_LITERAL;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonArrayType;
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonMultipleArrayType;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.compositeSequence;
@@ -285,6 +286,15 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.ArrayUnionFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition ARRAY_CONCAT =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("ARRAY_CONCAT")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(commonMultipleArrayType(1))
+                    .outputTypeStrategy(nullableIfArgs(COMMON))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.ArrayConcatFunction")
+                    .build();
     public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("$REPLICATE_ROWS$1")
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index 87665027279..8fb44053096 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -356,6 +356,14 @@ public final class InputTypeStrategies {
         return new 
CommonArrayInputTypeStrategy(ConstantArgumentCount.of(count));
     }
 
+    /**
+     * An {@link InputTypeStrategy} that expects {@code minCount} arguments 
that have a common array
+     * type.
+     */
+    public static InputTypeStrategy commonMultipleArrayType(int minCount) {
+        return new 
CommonArrayInputTypeStrategy(ConstantArgumentCount.from(minCount));
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private InputTypeStrategies() {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
index 50bd6f6a4dd..689c13d38ac 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
@@ -43,7 +43,8 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                         arrayPositionTestCases(),
                         arrayRemoveTestCases(),
                         arrayReverseTestCases(),
-                        arrayUnionTestCases())
+                        arrayUnionTestCases(),
+                        arrayConcatTestCases())
                 .flatMap(s -> s);
     }
 
@@ -486,4 +487,95 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                                 "Invalid input arguments. Expected signatures 
are:\n"
                                         + "ARRAY_UNION(<COMMON>, <COMMON>)"));
     }
+
+    private Stream<TestSetSpec> arrayConcatTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_CONCAT)
+                        .onFieldsWithData(
+                                new Integer[] {1, 2, null},
+                                null,
+                                new Row[] {
+                                    Row.of(true, LocalDate.of(2022, 4, 20)),
+                                    Row.of(true, LocalDate.of(1990, 10, 14)),
+                                    null
+                                },
+                                new Integer[] {1},
+                                1,
+                                new Integer[][] {{1}},
+                                new String[] {"123"})
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())),
+                                DataTypes.ARRAY(DataTypes.INT().notNull()),
+                                DataTypes.INT().notNull(),
+                                
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())).notNull(),
+                                DataTypes.ARRAY(DataTypes.STRING()).notNull())
+                        .testResult(
+                                $("f0").arrayConcat(new Integer[] {1, null, 
4}),
+                                "ARRAY_CONCAT(f0, ARRAY[1, NULL, 4])",
+                                new Integer[] {1, 2, null, 1, null, 4},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                $("f0").arrayConcat(),
+                                "ARRAY_CONCAT(f0)",
+                                new Integer[] {1, 2, null},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testTableApiValidationError(
+                                $("f0").arrayConcat(
+                                                new Integer[] {null, null, 
null},
+                                                new Integer[] {1, 2, 3},
+                                                new Integer[] {3, 4, 5}),
+                                "Invalid function call:\n" + "array(NULL, 
NULL, NULL)")
+                        .testResult(
+                                $("f1").arrayConcat(
+                                                new Integer[] {1, null, 4},
+                                                new Integer[] {2, 3, 4},
+                                                new Integer[] {2, 3, 4}),
+                                "ARRAY_CONCAT(f1, ARRAY[1, NULL, 4], ARRAY[2, 
3, 4], ARRAY[2, 3, 4])",
+                                null,
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                $("f2").arrayConcat(
+                                                new Row[] {
+                                                    Row.of(true, 
LocalDate.of(1990, 10, 14)),
+                                                },
+                                                new Row[] {
+                                                    Row.of(true, 
LocalDate.of(1990, 10, 14)),
+                                                }),
+                                "ARRAY_CONCAT(f2, ARRAY[(TRUE, DATE 
'1990-10-14')], ARRAY[(TRUE, DATE '1990-10-14')])",
+                                new Row[] {
+                                    Row.of(true, LocalDate.of(2022, 4, 20)),
+                                    Row.of(true, LocalDate.of(1990, 10, 14)),
+                                    null,
+                                    Row.of(true, LocalDate.of(1990, 10, 14)),
+                                    Row.of(true, LocalDate.of(1990, 10, 14))
+                                },
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())))
+                        .testResult(
+                                $("f3").arrayConcat(new Integer[] {2, null}),
+                                "ARRAY_CONCAT(f3, ARRAY[2, NULL])",
+                                new Integer[] {1, 2, null},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testTableApiValidationError(
+                                $("f0").arrayConcat(
+                                                new Integer[] {null, null, 
null},
+                                                new Integer[] {1, 2, 3},
+                                                new Integer[] {3, 4, 5}),
+                                "Invalid function call:\n" + "array(NULL, 
NULL, NULL)")
+                        .testTableApiValidationError(
+                                $("f4").arrayConcat(true),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "ARRAY_CONCAT(<COMMON>, 
<COMMON>...)")
+                        .testTableApiValidationError(
+                                $("f5").arrayConcat(new Integer[] {1}),
+                                "Invalid function call:\n"
+                                        + "ARRAY_CONCAT(ARRAY<ARRAY<INT>> NOT 
NULL, ARRAY<INT NOT NULL> NOT NULL)")
+                        .testTableApiValidationError(
+                                $("f6").arrayConcat(new Integer[] {123}),
+                                "Invalid function call:\n"
+                                        + "ARRAY_CONCAT(ARRAY<STRING> NOT 
NULL, ARRAY<INT NOT NULL> NOT NULL)"));
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java
new file mode 100644
index 00000000000..d60c252d326
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayConcatFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_CONCAT}. */
+@Internal
+public class ArrayConcatFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayConcatFunction(SpecializedFunction.SpecializedContext context) 
{
+        super(BuiltInFunctionDefinitions.ARRAY_CONCAT, context);
+        final DataType dataType =
+                // Since input arrays could be with different nullability we 
can not rely just on
+                // the first element.
+                ((CollectionDataType) 
context.getCallContext().getOutputDataType().get())
+                        .getElementDataType();
+        elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(ArrayData... arrays) {
+        if (arrays == null || arrays.length == 0) {
+            return null;
+        }
+        if (arrays.length == 1) {
+            return arrays[0];
+        }
+        try {
+            List<Object> list = new ArrayList<>();
+            for (ArrayData array : arrays) {
+                if (array != null) {
+                    appendElements(array, elementGetter, list);
+                } else {
+                    return null;
+                }
+            }
+            return new GenericArrayData(list.toArray());
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(t);
+        }
+    }
+
+    void appendElements(ArrayData array, ArrayData.ElementGetter 
elementGetter, List<Object> list)
+            throws Throwable {
+        for (int i = 0; i < array.size(); ++i) {
+            final Object element = elementGetter.getElementOrNull(array, i);
+            list.add(element);
+        }
+    }
+}

Reply via email to