This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fcdba99  [FLINK-16206][table] Support JSON_ARRAYAGG
fcdba99 is described below

commit fcdba9932cca5ea996c990bee1d2f3f007056e5a
Author: Ingo Bürk <ingo.bu...@tngtech.com>
AuthorDate: Mon Oct 25 20:34:45 2021 +0200

    [FLINK-16206][table] Support JSON_ARRAYAGG
    
    We represent (NULL|ABSENT) ON NULL as two separate built-in functions
    for now. This is necessary because otherwise we would have to ship
    the symbol across the network for each record, which leads to various
    problems. Calcite essentially uses the same workaround. This is the
    same as was done for JSON_OBJECTAGG.
    
    This closes #17562.
---
 docs/data/sql_functions.yml                        |  59 +++++---
 docs/data/sql_functions_zh.yml                     |  59 +++++---
 flink-python/pyflink/table/expressions.py          |  22 ++-
 .../org/apache/flink/table/api/Expressions.java    |  37 +++++
 .../table/api/ImplicitExpressionConversions.scala  |  21 +++
 .../functions/BuiltInFunctionDefinitions.java      |  18 +++
 .../planner/expressions/SqlAggFunctionVisitor.java |   6 +
 .../aggfunctions/JsonArrayAggFunction.java         | 168 +++++++++++++++++++++
 .../functions/sql/FlinkSqlOperatorTable.java       |   4 +
 .../logical/WrapJsonAggFunctionArgumentsRule.java  |   6 +-
 .../planner/plan/utils/AggFunctionFactory.scala    |   4 +
 .../functions/JsonAggregationFunctionsITCase.java  |  49 +++++-
 .../WrapJsonAggFunctionArgumentsRuleTest.java      |  24 +++
 .../WrapJsonAggFunctionArgumentsRuleTest.xml       |  38 +++++
 14 files changed, 470 insertions(+), 45 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 6ffcf6b..adfe49d 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -644,6 +644,27 @@ json:
       SELECT JSON_EXISTS('{"a": true}',
         'strict $.b' FALSE ON ERROR);
       ```
+  - sql: JSON_STRING(value)
+    table: jsonString(value)
+    description: |
+      Serializes a value into JSON.
+
+      This function returns a JSON string containing the serialized value. If 
the value is `NULL`,
+      the function returns `NULL`.
+
+      ```
+      -- NULL
+      JSON_STRING(CAST(NULL AS INT))
+
+      -- '1'
+      JSON_STRING(1)
+      -- 'true'
+      JSON_STRING(TRUE)
+      -- '"Hello, World!"'
+      JSON_STRING('Hello, World!')
+      -- '[1,2]'
+      JSON_STRING(ARRAY[1, 2])
+      ```
   - sql: JSON_VALUE(jsonValue, path [RETURNING <dataType>] [ { NULL | ERROR | 
DEFAULT <defaultExpr> } ON EMPTY ] [ { NULL | ERROR | DEFAULT <defaultExpr> } 
ON ERROR ])
     table: STRING.jsonValue(STRING path [, returnType, onEmpty, 
defaultOnEmpty, onError, defaultOnError])
     description: |
@@ -785,27 +806,6 @@ json:
         JSON_OBJECTAGG(KEY product VALUE cnt)
       FROM orders
       ```
-  - sql: JSON_STRING(value)
-    table: jsonString(value)
-    description: |
-      Serializes a value into JSON.
-
-      This function returns a JSON string containing the serialized value. If 
the value is `NULL`,
-      the function returns `NULL`.
-
-      ```
-      -- NULL
-      JSON_STRING(CAST(NULL AS INT))
-
-      -- '1'
-      JSON_STRING(1)
-      -- 'true'
-      JSON_STRING(TRUE)
-      -- '"Hello, World!"'
-      JSON_STRING('Hello, World!')
-      -- '[1,2]'
-      JSON_STRING(ARRAY[1, 2])
-      ```
   - sql: JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ])
     table: jsonArray(JsonOnNull, values...)
     description: |
@@ -835,6 +835,23 @@ json:
       -- '[[1]]'
       JSON_ARRAY(JSON_ARRAY(1))
       ```
+  - sql: JSON_ARRAYAGG(items [ { NULL | ABSENT } ON NULL ])
+    table: jsonArrayAgg(JsonOnNull, itemExpression)
+    description: |
+      Builds a JSON object string by aggregating items into an array.
+
+      Item expressions can be arbitrary, including other JSON functions. If a 
value is `NULL`, the
+      `ON NULL` behavior defines what to do. If omitted, `ABSENT ON NULL` is 
assumed by default.
+
+      This function is currently not supported in `OVER` windows, unbounded 
session windows, or hop
+      windows.
+
+      ```
+      -- '["Apple","Banana","Orange"]'
+      SELECT
+        JSON_ARRAYAGG(product)
+      FROM orders
+      ```
 
 valueconstruction:
   - sql: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 2aa0048..3503b12 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -762,6 +762,27 @@ json:
       SELECT JSON_EXISTS('{"a": true}',
         'strict $.b' FALSE ON ERROR);
       ```
+  - sql: JSON_STRING(value)
+    table: jsonString(value)
+    description: |
+      Serializes a value into JSON.
+
+      This function returns a JSON string containing the serialized value. If 
the value is `NULL`,
+      the function returns `NULL`.
+
+      ```
+      -- NULL
+      JSON_STRING(CAST(NULL AS INT))
+
+      -- '1'
+      JSON_STRING(1)
+      -- 'true'
+      JSON_STRING(TRUE)
+      -- '"Hello, World!"'
+      JSON_STRING('Hello, World!')
+      -- '[1,2]'
+      JSON_STRING(ARRAY[1, 2])
+      ```
   - sql: JSON_VALUE(jsonValue, path [RETURNING <dataType>] [ { NULL | ERROR | 
DEFAULT <defaultExpr> } ON EMPTY ] [ { NULL | ERROR | DEFAULT <defaultExpr> } 
ON ERROR ])
     table: STRING.jsonValue(STRING path [, returnType, onEmpty, 
defaultOnEmpty, onError, defaultOnError])
     description: |
@@ -903,27 +924,6 @@ json:
         JSON_OBJECTAGG(KEY product VALUE cnt)
       FROM orders
       ```
-  - sql: JSON_STRING(value)
-    table: jsonString(value)
-    description: |
-      Serializes a value into JSON.
-
-      This function returns a JSON string containing the serialized value. If 
the value is `NULL`,
-      the function returns `NULL`.
-
-      ```
-      -- NULL
-      JSON_STRING(CAST(NULL AS INT))
-
-      -- '1'
-      JSON_STRING(1)
-      -- 'true'
-      JSON_STRING(TRUE)
-      -- '"Hello, World!"'
-      JSON_STRING('Hello, World!')
-      -- '[1,2]'
-      JSON_STRING(ARRAY[1, 2])
-      ```
   - sql: JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ])
     table: jsonArray(JsonOnNull, values...)
     description: |
@@ -953,6 +953,23 @@ json:
       -- '[[1]]'
       JSON_ARRAY(JSON_ARRAY(1))
       ```
+  - sql: JSON_ARRAYAGG(items [ { NULL | ABSENT } ON NULL ])
+    table: jsonArrayAgg(JsonOnNull, itemExpression)
+    description: |
+      Builds a JSON object string by aggregating items into an array.
+
+      Item expressions can be arbitrary, including other JSON functions. If a 
value is `NULL`, the
+      `ON NULL` behavior defines what to do. If omitted, `ABSENT ON NULL` is 
assumed by default.
+
+      This function is currently not supported in `OVER` windows, unbounded 
session windows, or hop
+      windows.
+
+      ```
+      -- '["Apple","Banana","Orange"]'
+      SELECT
+        JSON_ARRAYAGG(product)
+      FROM orders
+      ```
 
 valueconstruction:
   - sql: |
diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index cfa5d79..0a07629 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -30,7 +30,8 @@ __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 
'or_', 'not_', 'UNBOU
            'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 
'row', 'map_',
            'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 
'negative', 'concat',
            'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 
'without_columns', 'json_string',
-           'json_object', 'json_object_agg', 'json_array', 'call', 'call_sql', 
'source_watermark']
+           'json_object', 'json_object_agg', 'json_array', 'json_array_agg', 
'call', 'call_sql',
+           'source_watermark']
 
 
 def _leaf_op(op_name: str) -> Expression:
@@ -716,6 +717,25 @@ def json_array(on_null: JsonOnNull = JsonOnNull.ABSENT, 
*args) -> Expression:
     return _varargs_op("jsonArray", *(on_null._to_j_json_on_null(), *args))
 
 
+def json_array_agg(on_null: JsonOnNull, item_expr) -> Expression:
+    """
+    Builds a JSON object string by aggregating items into an array.
+
+    Item expressions can be arbitrary, including other JSON functions. If a 
value is `NULL`, the
+    `on_null` behavior defines what to do.
+
+    This function is currently not supported in `OVER` windows, unbounded 
session windows, or hop
+    windows.
+
+    Examples:
+    ::
+
+        >>> # '["Apple","Banana","Orange"]'
+        >>> orders.select(json_array_agg(JsonOnNull.NULL, col("product")))
+    """
+    return _binary_op("jsonArrayAgg", on_null._to_j_json_on_null(), item_expr)
+
+
 def call(f: Union[str, UserDefinedFunctionWrapper], *args) -> Expression:
     """
     The first parameter `f` could be a str or a Python user-defined function.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 569addd..56a241b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -45,6 +45,8 @@ import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCa
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_ARRAY;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL;
@@ -651,6 +653,7 @@ public final class Expressions {
      * }</pre>
      *
      * @see #jsonObject(JsonOnNull, Object...)
+     * @see #jsonArrayAgg(JsonOnNull, Object)
      */
     public static ApiExpression jsonObjectAgg(JsonOnNull onNull, Object 
keyExpr, Object valueExpr) {
         final BuiltInFunctionDefinition functionDefinition;
@@ -728,6 +731,40 @@ public final class Expressions {
     }
 
     /**
+     * Builds a JSON object string by aggregating items into an array.
+     *
+     * <p>Item expressions can be arbitrary, including other JSON functions. 
If a value is {@code
+     * NULL}, the {@link JsonOnNull onNull} behavior defines what to do.
+     *
+     * <p>This function is currently not supported in {@code OVER} windows, 
unbounded session
+     * windows, or hop windows.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // "[\"Apple\",\"Banana\",\"Orange\"]"
+     * orders.select(jsonArrayAgg(JsonOnNull.NULL, $("product")))
+     * }</pre>
+     *
+     * @see #jsonArray(JsonOnNull, Object...)
+     * @see #jsonObjectAgg(JsonOnNull, Object, Object)
+     */
+    public static ApiExpression jsonArrayAgg(JsonOnNull onNull, Object 
itemExpr) {
+        final BuiltInFunctionDefinition functionDefinition;
+        switch (onNull) {
+            case NULL:
+                functionDefinition = JSON_ARRAYAGG_NULL_ON_NULL;
+                break;
+            case ABSENT:
+            default:
+                functionDefinition = JSON_ARRAYAGG_ABSENT_ON_NULL;
+                break;
+        }
+
+        return apiCall(functionDefinition, itemExpr);
+    }
+
+    /**
      * A call to a function that will be looked up in a catalog. There are two 
kinds of functions:
      *
      * <ul>
diff --git 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index 718a9ff..515beac 100644
--- 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -922,4 +922,25 @@ trait ImplicitExpressionConversions {
   def jsonArray(onNull: JsonOnNull, values: Expression*): Expression = {
     Expressions.jsonArray(onNull, values: _*)
   }
+
+  /**
+   * Builds a JSON object string by aggregating items into an array.
+   *
+   * Item expressions can be arbitrary, including other JSON functions. If a 
value is `NULL`,
+   * [[JsonOnNull onNull]] behavior defines what to do.
+   *
+   * This function is currently not supported in `OVER` windows, unbounded 
session windows, or hop
+   * windows.
+   *
+   * Examples:
+   * {{{
+   * // "[\"Apple\",\"Banana\",\"Orange\"]"
+   * orders.select(jsonArrayAgg(JsonOnNull.NULL, $("product")))
+   * }}}
+   *
+   * @see #jsonObject
+   */
+  def jsonArrayAgg(onNull: JsonOnNull, itemExpr: Expression): Expression = {
+    Expressions.jsonArrayAgg(onNull, itemExpr)
+  }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 6270d89..c816f07 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1621,6 +1621,24 @@ public final class BuiltInFunctionDefinitions {
                     .runtimeDeferred()
                     .build();
 
+    public static final BuiltInFunctionDefinition JSON_ARRAYAGG_NULL_ON_NULL =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("JSON_ARRAYAGG_NULL_ON_NULL")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(sequence(JSON_ARGUMENT))
+                    .outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
+                    .runtimeDeferred()
+                    .build();
+
+    public static final BuiltInFunctionDefinition JSON_ARRAYAGG_ABSENT_ON_NULL 
=
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("JSON_ARRAYAGG_ABSENT_ON_NULL")
+                    .kind(AGGREGATE)
+                    .inputTypeStrategy(sequence(JSON_ARGUMENT))
+                    .outputTypeStrategy(explicit(DataTypes.STRING().notNull()))
+                    .runtimeDeferred()
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Other functions
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
index b451b63..4ec88c0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java
@@ -82,6 +82,12 @@ public class SqlAggFunctionVisitor extends 
ExpressionDefaultVisitor<SqlAggFuncti
         AGG_DEF_SQL_OPERATOR_MAPPING.put(
                 BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL,
                 FlinkSqlOperatorTable.JSON_OBJECTAGG_ABSENT_ON_NULL);
+        AGG_DEF_SQL_OPERATOR_MAPPING.put(
+                BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL,
+                FlinkSqlOperatorTable.JSON_ARRAYAGG_NULL_ON_NULL);
+        AGG_DEF_SQL_OPERATOR_MAPPING.put(
+                BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL,
+                FlinkSqlOperatorTable.JSON_ARRAYAGG_ABSENT_ON_NULL);
     }
 
     private final RelBuilder relBuilder;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java
new file mode 100644
index 0000000..19c4081
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import 
org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
+import 
org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.runtime.functions.SqlJsonUtils.createArrayNode;
+import static 
org.apache.flink.table.runtime.functions.SqlJsonUtils.getNodeFactory;
+import static 
org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson;
+
+/**
+ * Implementation for {@link 
BuiltInFunctionDefinitions#JSON_ARRAYAGG_ABSENT_ON_NULL} / {@link
+ * BuiltInFunctionDefinitions#JSON_ARRAYAGG_NULL_ON_NULL}.
+ *
+ * <p>Note that this function only ever receives strings to accumulate because 
{@link
+ * WrapJsonAggFunctionArgumentsRule} wraps arguments into {@link
+ * BuiltInFunctionDefinitions#JSON_STRING}.
+ */
+@Internal
+public class JsonArrayAggFunction
+        extends BuiltInAggregateFunction<String, 
JsonArrayAggFunction.Accumulator> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Marker that represents a {@code null} since {@link ListView} does not 
allow {@code null}s.
+     *
+     * <p>Note that due to {@link WrapJsonAggFunctionArgumentsRule} and the 
fact that this function
+     * already only receives JSON strings, this value cannot be created by the 
user and is thus safe
+     * to use.
+     */
+    private static final StringData NULL_STR = StringData.fromString("null");
+
+    private final transient List<DataType> argumentTypes;
+    private final boolean skipNulls;
+
+    public JsonArrayAggFunction(LogicalType[] argumentTypes, boolean 
skipNulls) {
+        this.argumentTypes =
+                Arrays.stream(argumentTypes)
+                        .map(DataTypeUtils::toInternalDataType)
+                        .collect(Collectors.toList());
+
+        this.skipNulls = skipNulls;
+    }
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return argumentTypes;
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.STRING();
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                Accumulator.class,
+                DataTypes.FIELD(
+                        "list", 
ListView.newListViewDataType(DataTypes.STRING().toInternal())));
+    }
+
+    @Override
+    public Accumulator createAccumulator() {
+        return new Accumulator();
+    }
+
+    public void resetAccumulator(Accumulator acc) {
+        acc.list.clear();
+    }
+
+    public void accumulate(Accumulator acc, StringData itemData) throws 
Exception {
+        if (itemData == null) {
+            if (!skipNulls) {
+                acc.list.add(NULL_STR);
+            }
+        } else {
+            acc.list.add(itemData);
+        }
+    }
+
+    public void retract(Accumulator acc, StringData itemData) throws Exception 
{
+        if (itemData == null) {
+            acc.list.remove(NULL_STR);
+        } else {
+            acc.list.remove(itemData);
+        }
+    }
+
+    @Override
+    public String getValue(Accumulator acc) {
+        final ArrayNode rootNode = createArrayNode();
+        try {
+            for (final StringData item : acc.list.get()) {
+                final JsonNode itemNode =
+                        getNodeFactory().rawValueNode(new 
RawValue(item.toString()));
+                rootNode.add(itemNode);
+            }
+        } catch (Exception e) {
+            throw new TableException("The accumulator state could not be 
serialized.", e);
+        }
+
+        return serializeJson(rootNode);
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+
+    /** Accumulator for {@link JsonArrayAggFunction}. */
+    public static class Accumulator {
+
+        public ListView<StringData> list = new ListView<>();
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+
+            if (other == null || getClass() != other.getClass()) {
+                return false;
+            }
+
+            final JsonArrayAggFunction.Accumulator that = 
(JsonArrayAggFunction.Accumulator) other;
+            return Objects.equals(list, that.list);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(list);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index c0d236a..fe718f9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -1152,6 +1152,10 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
     public static final SqlAggFunction JSON_OBJECTAGG_ABSENT_ON_NULL =
             
SqlStdOperatorTable.JSON_OBJECTAGG.with(SqlJsonConstructorNullClause.ABSENT_ON_NULL);
     public static final SqlFunction JSON_ARRAY = 
SqlStdOperatorTable.JSON_ARRAY;
+    public static final SqlAggFunction JSON_ARRAYAGG_NULL_ON_NULL =
+            
SqlStdOperatorTable.JSON_ARRAYAGG.with(SqlJsonConstructorNullClause.NULL_ON_NULL);
+    public static final SqlAggFunction JSON_ARRAYAGG_ABSENT_ON_NULL =
+            SqlStdOperatorTable.JSON_ARRAYAGG;
     public static final SqlPostfixOperator IS_JSON_VALUE = 
SqlStdOperatorTable.IS_JSON_VALUE;
     public static final SqlPostfixOperator IS_JSON_OBJECT = 
SqlStdOperatorTable.IS_JSON_OBJECT;
     public static final SqlPostfixOperator IS_JSON_ARRAY = 
SqlStdOperatorTable.IS_JSON_ARRAY;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
index 78bd3c9..eb93a54 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java
@@ -32,6 +32,8 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.fun.SqlJsonArrayAggAggFunction;
 import org.apache.calcite.sql.fun.SqlJsonObjectAggAggFunction;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.mapping.MappingType;
@@ -155,7 +157,9 @@ public class WrapJsonAggFunctionArgumentsRule
     }
 
     private static boolean isJsonAggregation(AggregateCall aggCall) {
-        return aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction;
+        final SqlAggFunction aggregation = aggCall.getAggregation();
+        return aggregation instanceof SqlJsonObjectAggAggFunction
+                || aggregation instanceof SqlJsonArrayAggAggFunction;
     }
 
     // 
---------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
index 0e8ae64..f76dec8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala
@@ -126,6 +126,10 @@ class AggFunctionFactory(
         val onNull = fn.asInstanceOf[SqlJsonObjectAggAggFunction].getNullClause
         new JsonObjectAggFunction(argTypes, onNull == 
SqlJsonConstructorNullClause.ABSENT_ON_NULL)
 
+      case fn: SqlAggFunction if fn.getKind == SqlKind.JSON_ARRAYAGG =>
+        val onNull = fn.asInstanceOf[SqlJsonArrayAggAggFunction].getNullClause
+        new JsonArrayAggFunction(argTypes, onNull == 
SqlJsonConstructorNullClause.ABSENT_ON_NULL)
+
       case udagg: AggSqlFunction =>
         // Can not touch the literals, Calcite make them in previous RelNode.
         // In here, all inputs are input refs.
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
index 07df9de..9209a49 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java
@@ -33,6 +33,7 @@ import static org.apache.flink.table.api.DataTypes.ROW;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.VARCHAR;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.jsonArrayAgg;
 import static org.apache.flink.table.api.Expressions.jsonObjectAgg;
 import static org.apache.flink.types.RowKind.DELETE;
 import static org.apache.flink.types.RowKind.INSERT;
@@ -43,6 +44,7 @@ public class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTest
     @Parameterized.Parameters(name = "{index}: {0}")
     public static List<TestSpec> testData() throws Exception {
         return Arrays.asList(
+                // JSON_OBJECTAGG
                 
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL)
                         .withDescription("Basic Aggregation")
                         .withSource(
@@ -118,6 +120,51 @@ public class JsonAggregationFunctionsITCase extends 
BuiltInAggregateFunctionTest
                                 ROW(INT(), STRING().notNull()),
                                 Arrays.asList(
                                         Row.of(1, "{\"A\":0,\"B\":0}"),
-                                        Row.of(2, "{\"A\":0,\"C\":0}"))));
+                                        Row.of(2, "{\"A\":0,\"C\":0}"))),
+
+                // JSON_ARRAYAGG
+                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+                        .withDescription("Basic Aggregation")
+                        .withSource(
+                                ROW(STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, "A"),
+                                        Row.ofKind(INSERT, (String) null),
+                                        Row.ofKind(INSERT, "C")))
+                        .testResult(
+                                source -> "SELECT JSON_ARRAYAGG(f0) FROM " + 
source,
+                                source -> 
source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+                                ROW(VARCHAR(2000).notNull()),
+                                ROW(STRING().notNull()),
+                                
Collections.singletonList(Row.of("[\"A\",\"C\"]"))),
+                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL)
+                        .withDescription("Keeps NULLs")
+                        .withSource(
+                                ROW(STRING()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, "A"),
+                                        Row.ofKind(INSERT, (String) null),
+                                        Row.ofKind(INSERT, "C")))
+                        .testResult(
+                                source -> "SELECT JSON_ARRAYAGG(f0 NULL ON 
NULL) FROM " + source,
+                                source -> 
source.select(jsonArrayAgg(JsonOnNull.NULL, $("f0"))),
+                                ROW(VARCHAR(2000).notNull()),
+                                ROW(STRING().notNull()),
+                                
Collections.singletonList(Row.of("[\"A\",null,\"C\"]"))),
+                
TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL)
+                        .withDescription("Retractions")
+                        .withSource(
+                                ROW(INT()),
+                                Arrays.asList(
+                                        Row.ofKind(INSERT, 1),
+                                        Row.ofKind(INSERT, 2),
+                                        Row.ofKind(INSERT, 3),
+                                        Row.ofKind(DELETE, 2)))
+                        .testResult(
+                                source -> "SELECT JSON_ARRAYAGG(f0) FROM " + 
source,
+                                source -> 
source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))),
+                                ROW(VARCHAR(2000).notNull()),
+                                ROW(STRING().notNull()),
+                                Collections.singletonList(Row.of("[1,3]"))));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
index f0bb715..5836822 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java
@@ -69,4 +69,28 @@ public class WrapJsonAggFunctionArgumentsRuleTest extends 
TableTestBase {
         util.tableEnv().createTable("T", sourceDescriptor);
         util.verifyRelPlan("SELECT f0, JSON_OBJECTAGG(f1 VALUE f0) FROM T 
GROUP BY f0");
     }
+
+    @Test
+    public void testJsonArrayAgg() {
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(Schema.newBuilder().column("f0", 
STRING()).build())
+                        .unboundedScanSource(ChangelogMode.all())
+                        .build();
+
+        util.tableEnv().createTable("T", sourceDescriptor);
+        util.verifyRelPlan("SELECT JSON_ARRAYAGG(f0) FROM T");
+    }
+
+    @Test
+    public void testJsonArrayAggInGroupWindow() {
+        final TableDescriptor sourceDescriptor =
+                TableFactoryHarness.newBuilder()
+                        .schema(Schema.newBuilder().column("f0", 
INT()).build())
+                        .unboundedScanSource()
+                        .build();
+
+        util.tableEnv().createTable("T", sourceDescriptor);
+        util.verifyRelPlan("SELECT f0, JSON_ARRAYAGG(f0) FROM T GROUP BY f0");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
index bf7ffcb..2b0b718 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml
@@ -16,6 +16,44 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testJsonArrayAgg">
+    <Resource name="sql">
+      <![CDATA[SELECT JSON_ARRAYAGG(f0) FROM T]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(select=[JSON_ARRAYAGG_ABSENT_ON_NULL_RETRACT($f1) AS EXPR$0])
++- Exchange(distribution=[single])
+   +- Calc(select=[f0, JSON_STRING(f0) AS $f1])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJsonArrayAggInGroupWindow">
+    <Resource name="sql">
+      <![CDATA[SELECT f0, JSON_ARRAYAGG(f0) FROM T GROUP BY f0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)])
++- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+GroupAggregate(groupBy=[f0], select=[f0, JSON_ARRAYAGG_ABSENT_ON_NULL($f1) AS 
EXPR$1])
++- Exchange(distribution=[hash[f0]])
+   +- Calc(select=[f0, JSON_STRING(f0) AS $f1])
+      +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testJsonObjectAggInGroupWindow">
     <Resource name="sql">
       <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f0) FROM T GROUP BY f0]]>

Reply via email to