This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new fcdba99 [FLINK-16206][table] Support JSON_ARRAYAGG fcdba99 is described below commit fcdba9932cca5ea996c990bee1d2f3f007056e5a Author: Ingo Bürk <ingo.bu...@tngtech.com> AuthorDate: Mon Oct 25 20:34:45 2021 +0200 [FLINK-16206][table] Support JSON_ARRAYAGG We represent (NULL|ABSENT) ON NULL as two separate built-in functions for now. This is necessary because otherwise we would have to ship the symbol across the network for each record, which leads to various problems. Calcite essentially uses the same workaround. This is the same as was done for JSON_OBJECTAGG. This closes #17562. --- docs/data/sql_functions.yml | 59 +++++--- docs/data/sql_functions_zh.yml | 59 +++++--- flink-python/pyflink/table/expressions.py | 22 ++- .../org/apache/flink/table/api/Expressions.java | 37 +++++ .../table/api/ImplicitExpressionConversions.scala | 21 +++ .../functions/BuiltInFunctionDefinitions.java | 18 +++ .../planner/expressions/SqlAggFunctionVisitor.java | 6 + .../aggfunctions/JsonArrayAggFunction.java | 168 +++++++++++++++++++++ .../functions/sql/FlinkSqlOperatorTable.java | 4 + .../logical/WrapJsonAggFunctionArgumentsRule.java | 6 +- .../planner/plan/utils/AggFunctionFactory.scala | 4 + .../functions/JsonAggregationFunctionsITCase.java | 49 +++++- .../WrapJsonAggFunctionArgumentsRuleTest.java | 24 +++ .../WrapJsonAggFunctionArgumentsRuleTest.xml | 38 +++++ 14 files changed, 470 insertions(+), 45 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 6ffcf6b..adfe49d 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -644,6 +644,27 @@ json: SELECT JSON_EXISTS('{"a": true}', 'strict $.b' FALSE ON ERROR); ``` + - sql: JSON_STRING(value) + table: jsonString(value) + description: | + Serializes a value into JSON. + + This function returns a JSON string containing the serialized value. If the value is `NULL`, + the function returns `NULL`. + + ``` + -- NULL + JSON_STRING(CAST(NULL AS INT)) + + -- '1' + JSON_STRING(1) + -- 'true' + JSON_STRING(TRUE) + -- '"Hello, World!"' + JSON_STRING('Hello, World!') + -- '[1,2]' + JSON_STRING(ARRAY[1, 2]) + ``` - sql: JSON_VALUE(jsonValue, path [RETURNING <dataType>] [ { NULL | ERROR | DEFAULT <defaultExpr> } ON EMPTY ] [ { NULL | ERROR | DEFAULT <defaultExpr> } ON ERROR ]) table: STRING.jsonValue(STRING path [, returnType, onEmpty, defaultOnEmpty, onError, defaultOnError]) description: | @@ -785,27 +806,6 @@ json: JSON_OBJECTAGG(KEY product VALUE cnt) FROM orders ``` - - sql: JSON_STRING(value) - table: jsonString(value) - description: | - Serializes a value into JSON. - - This function returns a JSON string containing the serialized value. If the value is `NULL`, - the function returns `NULL`. - - ``` - -- NULL - JSON_STRING(CAST(NULL AS INT)) - - -- '1' - JSON_STRING(1) - -- 'true' - JSON_STRING(TRUE) - -- '"Hello, World!"' - JSON_STRING('Hello, World!') - -- '[1,2]' - JSON_STRING(ARRAY[1, 2]) - ``` - sql: JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ]) table: jsonArray(JsonOnNull, values...) description: | @@ -835,6 +835,23 @@ json: -- '[[1]]' JSON_ARRAY(JSON_ARRAY(1)) ``` + - sql: JSON_ARRAYAGG(items [ { NULL | ABSENT } ON NULL ]) + table: jsonArrayAgg(JsonOnNull, itemExpression) + description: | + Builds a JSON object string by aggregating items into an array. + + Item expressions can be arbitrary, including other JSON functions. If a value is `NULL`, the + `ON NULL` behavior defines what to do. If omitted, `ABSENT ON NULL` is assumed by default. + + This function is currently not supported in `OVER` windows, unbounded session windows, or hop + windows. + + ``` + -- '["Apple","Banana","Orange"]' + SELECT + JSON_ARRAYAGG(product) + FROM orders + ``` valueconstruction: - sql: | diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index 2aa0048..3503b12 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -762,6 +762,27 @@ json: SELECT JSON_EXISTS('{"a": true}', 'strict $.b' FALSE ON ERROR); ``` + - sql: JSON_STRING(value) + table: jsonString(value) + description: | + Serializes a value into JSON. + + This function returns a JSON string containing the serialized value. If the value is `NULL`, + the function returns `NULL`. + + ``` + -- NULL + JSON_STRING(CAST(NULL AS INT)) + + -- '1' + JSON_STRING(1) + -- 'true' + JSON_STRING(TRUE) + -- '"Hello, World!"' + JSON_STRING('Hello, World!') + -- '[1,2]' + JSON_STRING(ARRAY[1, 2]) + ``` - sql: JSON_VALUE(jsonValue, path [RETURNING <dataType>] [ { NULL | ERROR | DEFAULT <defaultExpr> } ON EMPTY ] [ { NULL | ERROR | DEFAULT <defaultExpr> } ON ERROR ]) table: STRING.jsonValue(STRING path [, returnType, onEmpty, defaultOnEmpty, onError, defaultOnError]) description: | @@ -903,27 +924,6 @@ json: JSON_OBJECTAGG(KEY product VALUE cnt) FROM orders ``` - - sql: JSON_STRING(value) - table: jsonString(value) - description: | - Serializes a value into JSON. - - This function returns a JSON string containing the serialized value. If the value is `NULL`, - the function returns `NULL`. - - ``` - -- NULL - JSON_STRING(CAST(NULL AS INT)) - - -- '1' - JSON_STRING(1) - -- 'true' - JSON_STRING(TRUE) - -- '"Hello, World!"' - JSON_STRING('Hello, World!') - -- '[1,2]' - JSON_STRING(ARRAY[1, 2]) - ``` - sql: JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ]) table: jsonArray(JsonOnNull, values...) description: | @@ -953,6 +953,23 @@ json: -- '[[1]]' JSON_ARRAY(JSON_ARRAY(1)) ``` + - sql: JSON_ARRAYAGG(items [ { NULL | ABSENT } ON NULL ]) + table: jsonArrayAgg(JsonOnNull, itemExpression) + description: | + Builds a JSON object string by aggregating items into an array. + + Item expressions can be arbitrary, including other JSON functions. If a value is `NULL`, the + `ON NULL` behavior defines what to do. If omitted, `ABSENT ON NULL` is assumed by default. + + This function is currently not supported in `OVER` windows, unbounded session windows, or hop + windows. + + ``` + -- '["Apple","Banana","Orange"]' + SELECT + JSON_ARRAYAGG(product) + FROM orders + ``` valueconstruction: - sql: | diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py index cfa5d79..0a07629 100644 --- a/flink-python/pyflink/table/expressions.py +++ b/flink-python/pyflink/table/expressions.py @@ -30,7 +30,8 @@ __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 'or_', 'not_', 'UNBOU 'temporal_overlaps', 'date_format', 'timestamp_diff', 'array', 'row', 'map_', 'row_interval', 'pi', 'e', 'rand', 'rand_integer', 'atan2', 'negative', 'concat', 'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json_string', - 'json_object', 'json_object_agg', 'json_array', 'call', 'call_sql', 'source_watermark'] + 'json_object', 'json_object_agg', 'json_array', 'json_array_agg', 'call', 'call_sql', + 'source_watermark'] def _leaf_op(op_name: str) -> Expression: @@ -716,6 +717,25 @@ def json_array(on_null: JsonOnNull = JsonOnNull.ABSENT, *args) -> Expression: return _varargs_op("jsonArray", *(on_null._to_j_json_on_null(), *args)) +def json_array_agg(on_null: JsonOnNull, item_expr) -> Expression: + """ + Builds a JSON object string by aggregating items into an array. + + Item expressions can be arbitrary, including other JSON functions. If a value is `NULL`, the + `on_null` behavior defines what to do. + + This function is currently not supported in `OVER` windows, unbounded session windows, or hop + windows. + + Examples: + :: + + >>> # '["Apple","Banana","Orange"]' + >>> orders.select(json_array_agg(JsonOnNull.NULL, col("product"))) + """ + return _binary_op("jsonArrayAgg", on_null._to_j_json_on_null(), item_expr) + + def call(f: Union[str, UserDefinedFunctionWrapper], *args) -> Expression: """ The first parameter `f` could be a str or a Python user-defined function. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java index 569addd..56a241b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java @@ -45,6 +45,8 @@ import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCa import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_ARRAY; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL; @@ -651,6 +653,7 @@ public final class Expressions { * }</pre> * * @see #jsonObject(JsonOnNull, Object...) + * @see #jsonArrayAgg(JsonOnNull, Object) */ public static ApiExpression jsonObjectAgg(JsonOnNull onNull, Object keyExpr, Object valueExpr) { final BuiltInFunctionDefinition functionDefinition; @@ -728,6 +731,40 @@ public final class Expressions { } /** + * Builds a JSON object string by aggregating items into an array. + * + * <p>Item expressions can be arbitrary, including other JSON functions. If a value is {@code + * NULL}, the {@link JsonOnNull onNull} behavior defines what to do. + * + * <p>This function is currently not supported in {@code OVER} windows, unbounded session + * windows, or hop windows. + * + * <p>Examples: + * + * <pre>{@code + * // "[\"Apple\",\"Banana\",\"Orange\"]" + * orders.select(jsonArrayAgg(JsonOnNull.NULL, $("product"))) + * }</pre> + * + * @see #jsonArray(JsonOnNull, Object...) + * @see #jsonObjectAgg(JsonOnNull, Object, Object) + */ + public static ApiExpression jsonArrayAgg(JsonOnNull onNull, Object itemExpr) { + final BuiltInFunctionDefinition functionDefinition; + switch (onNull) { + case NULL: + functionDefinition = JSON_ARRAYAGG_NULL_ON_NULL; + break; + case ABSENT: + default: + functionDefinition = JSON_ARRAYAGG_ABSENT_ON_NULL; + break; + } + + return apiCall(functionDefinition, itemExpr); + } + + /** * A call to a function that will be looked up in a catalog. There are two kinds of functions: * * <ul> diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala index 718a9ff..515beac 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala @@ -922,4 +922,25 @@ trait ImplicitExpressionConversions { def jsonArray(onNull: JsonOnNull, values: Expression*): Expression = { Expressions.jsonArray(onNull, values: _*) } + + /** + * Builds a JSON object string by aggregating items into an array. + * + * Item expressions can be arbitrary, including other JSON functions. If a value is `NULL`, + * [[JsonOnNull onNull]] behavior defines what to do. + * + * This function is currently not supported in `OVER` windows, unbounded session windows, or hop + * windows. + * + * Examples: + * {{{ + * // "[\"Apple\",\"Banana\",\"Orange\"]" + * orders.select(jsonArrayAgg(JsonOnNull.NULL, $("product"))) + * }}} + * + * @see #jsonObject + */ + def jsonArrayAgg(onNull: JsonOnNull, itemExpr: Expression): Expression = { + Expressions.jsonArrayAgg(onNull, itemExpr) + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 6270d89..c816f07 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1621,6 +1621,24 @@ public final class BuiltInFunctionDefinitions { .runtimeDeferred() .build(); + public static final BuiltInFunctionDefinition JSON_ARRAYAGG_NULL_ON_NULL = + BuiltInFunctionDefinition.newBuilder() + .name("JSON_ARRAYAGG_NULL_ON_NULL") + .kind(AGGREGATE) + .inputTypeStrategy(sequence(JSON_ARGUMENT)) + .outputTypeStrategy(explicit(DataTypes.STRING().notNull())) + .runtimeDeferred() + .build(); + + public static final BuiltInFunctionDefinition JSON_ARRAYAGG_ABSENT_ON_NULL = + BuiltInFunctionDefinition.newBuilder() + .name("JSON_ARRAYAGG_ABSENT_ON_NULL") + .kind(AGGREGATE) + .inputTypeStrategy(sequence(JSON_ARGUMENT)) + .outputTypeStrategy(explicit(DataTypes.STRING().notNull())) + .runtimeDeferred() + .build(); + // -------------------------------------------------------------------------------------------- // Other functions // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java index b451b63..4ec88c0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/SqlAggFunctionVisitor.java @@ -82,6 +82,12 @@ public class SqlAggFunctionVisitor extends ExpressionDefaultVisitor<SqlAggFuncti AGG_DEF_SQL_OPERATOR_MAPPING.put( BuiltInFunctionDefinitions.JSON_OBJECTAGG_ABSENT_ON_NULL, FlinkSqlOperatorTable.JSON_OBJECTAGG_ABSENT_ON_NULL); + AGG_DEF_SQL_OPERATOR_MAPPING.put( + BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL, + FlinkSqlOperatorTable.JSON_ARRAYAGG_NULL_ON_NULL); + AGG_DEF_SQL_OPERATOR_MAPPING.put( + BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL, + FlinkSqlOperatorTable.JSON_ARRAYAGG_ABSENT_ON_NULL); } private final RelBuilder relBuilder; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java new file mode 100644 index 0000000..19c4081 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.aggfunctions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.dataview.ListView; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule; +import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.utils.DataTypeUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.flink.table.runtime.functions.SqlJsonUtils.createArrayNode; +import static org.apache.flink.table.runtime.functions.SqlJsonUtils.getNodeFactory; +import static org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson; + +/** + * Implementation for {@link BuiltInFunctionDefinitions#JSON_ARRAYAGG_ABSENT_ON_NULL} / {@link + * BuiltInFunctionDefinitions#JSON_ARRAYAGG_NULL_ON_NULL}. + * + * <p>Note that this function only ever receives strings to accumulate because {@link + * WrapJsonAggFunctionArgumentsRule} wraps arguments into {@link + * BuiltInFunctionDefinitions#JSON_STRING}. + */ +@Internal +public class JsonArrayAggFunction + extends BuiltInAggregateFunction<String, JsonArrayAggFunction.Accumulator> { + + private static final long serialVersionUID = 1L; + + /** + * Marker that represents a {@code null} since {@link ListView} does not allow {@code null}s. + * + * <p>Note that due to {@link WrapJsonAggFunctionArgumentsRule} and the fact that this function + * already only receives JSON strings, this value cannot be created by the user and is thus safe + * to use. + */ + private static final StringData NULL_STR = StringData.fromString("null"); + + private final transient List<DataType> argumentTypes; + private final boolean skipNulls; + + public JsonArrayAggFunction(LogicalType[] argumentTypes, boolean skipNulls) { + this.argumentTypes = + Arrays.stream(argumentTypes) + .map(DataTypeUtils::toInternalDataType) + .collect(Collectors.toList()); + + this.skipNulls = skipNulls; + } + + @Override + public List<DataType> getArgumentDataTypes() { + return argumentTypes; + } + + @Override + public DataType getOutputDataType() { + return DataTypes.STRING(); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + Accumulator.class, + DataTypes.FIELD( + "list", ListView.newListViewDataType(DataTypes.STRING().toInternal()))); + } + + @Override + public Accumulator createAccumulator() { + return new Accumulator(); + } + + public void resetAccumulator(Accumulator acc) { + acc.list.clear(); + } + + public void accumulate(Accumulator acc, StringData itemData) throws Exception { + if (itemData == null) { + if (!skipNulls) { + acc.list.add(NULL_STR); + } + } else { + acc.list.add(itemData); + } + } + + public void retract(Accumulator acc, StringData itemData) throws Exception { + if (itemData == null) { + acc.list.remove(NULL_STR); + } else { + acc.list.remove(itemData); + } + } + + @Override + public String getValue(Accumulator acc) { + final ArrayNode rootNode = createArrayNode(); + try { + for (final StringData item : acc.list.get()) { + final JsonNode itemNode = + getNodeFactory().rawValueNode(new RawValue(item.toString())); + rootNode.add(itemNode); + } + } catch (Exception e) { + throw new TableException("The accumulator state could not be serialized.", e); + } + + return serializeJson(rootNode); + } + + // --------------------------------------------------------------------------------------------- + + /** Accumulator for {@link JsonArrayAggFunction}. */ + public static class Accumulator { + + public ListView<StringData> list = new ListView<>(); + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final JsonArrayAggFunction.Accumulator that = (JsonArrayAggFunction.Accumulator) other; + return Objects.equals(list, that.list); + } + + @Override + public int hashCode() { + return Objects.hash(list); + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index c0d236a..fe718f9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -1152,6 +1152,10 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlAggFunction JSON_OBJECTAGG_ABSENT_ON_NULL = SqlStdOperatorTable.JSON_OBJECTAGG.with(SqlJsonConstructorNullClause.ABSENT_ON_NULL); public static final SqlFunction JSON_ARRAY = SqlStdOperatorTable.JSON_ARRAY; + public static final SqlAggFunction JSON_ARRAYAGG_NULL_ON_NULL = + SqlStdOperatorTable.JSON_ARRAYAGG.with(SqlJsonConstructorNullClause.NULL_ON_NULL); + public static final SqlAggFunction JSON_ARRAYAGG_ABSENT_ON_NULL = + SqlStdOperatorTable.JSON_ARRAYAGG; public static final SqlPostfixOperator IS_JSON_VALUE = SqlStdOperatorTable.IS_JSON_VALUE; public static final SqlPostfixOperator IS_JSON_OBJECT = SqlStdOperatorTable.IS_JSON_OBJECT; public static final SqlPostfixOperator IS_JSON_ARRAY = SqlStdOperatorTable.IS_JSON_ARRAY; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java index 78bd3c9..eb93a54 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRule.java @@ -32,6 +32,8 @@ import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.hint.RelHint; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.fun.SqlJsonArrayAggAggFunction; import org.apache.calcite.sql.fun.SqlJsonObjectAggAggFunction; import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.util.mapping.MappingType; @@ -155,7 +157,9 @@ public class WrapJsonAggFunctionArgumentsRule } private static boolean isJsonAggregation(AggregateCall aggCall) { - return aggCall.getAggregation() instanceof SqlJsonObjectAggAggFunction; + final SqlAggFunction aggregation = aggCall.getAggregation(); + return aggregation instanceof SqlJsonObjectAggAggFunction + || aggregation instanceof SqlJsonArrayAggAggFunction; } // --------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index 0e8ae64..f76dec8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -126,6 +126,10 @@ class AggFunctionFactory( val onNull = fn.asInstanceOf[SqlJsonObjectAggAggFunction].getNullClause new JsonObjectAggFunction(argTypes, onNull == SqlJsonConstructorNullClause.ABSENT_ON_NULL) + case fn: SqlAggFunction if fn.getKind == SqlKind.JSON_ARRAYAGG => + val onNull = fn.asInstanceOf[SqlJsonArrayAggAggFunction].getNullClause + new JsonArrayAggFunction(argTypes, onNull == SqlJsonConstructorNullClause.ABSENT_ON_NULL) + case udagg: AggSqlFunction => // Can not touch the literals, Calcite make them in previous RelNode. // In here, all inputs are input refs. diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java index 07df9de..9209a49 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonAggregationFunctionsITCase.java @@ -33,6 +33,7 @@ import static org.apache.flink.table.api.DataTypes.ROW; import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.api.DataTypes.VARCHAR; import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.jsonArrayAgg; import static org.apache.flink.table.api.Expressions.jsonObjectAgg; import static org.apache.flink.types.RowKind.DELETE; import static org.apache.flink.types.RowKind.INSERT; @@ -43,6 +44,7 @@ public class JsonAggregationFunctionsITCase extends BuiltInAggregateFunctionTest @Parameterized.Parameters(name = "{index}: {0}") public static List<TestSpec> testData() throws Exception { return Arrays.asList( + // JSON_OBJECTAGG TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_OBJECTAGG_NULL_ON_NULL) .withDescription("Basic Aggregation") .withSource( @@ -118,6 +120,51 @@ public class JsonAggregationFunctionsITCase extends BuiltInAggregateFunctionTest ROW(INT(), STRING().notNull()), Arrays.asList( Row.of(1, "{\"A\":0,\"B\":0}"), - Row.of(2, "{\"A\":0,\"C\":0}")))); + Row.of(2, "{\"A\":0,\"C\":0}"))), + + // JSON_ARRAYAGG + TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL) + .withDescription("Basic Aggregation") + .withSource( + ROW(STRING()), + Arrays.asList( + Row.ofKind(INSERT, "A"), + Row.ofKind(INSERT, (String) null), + Row.ofKind(INSERT, "C"))) + .testResult( + source -> "SELECT JSON_ARRAYAGG(f0) FROM " + source, + source -> source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))), + ROW(VARCHAR(2000).notNull()), + ROW(STRING().notNull()), + Collections.singletonList(Row.of("[\"A\",\"C\"]"))), + TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_NULL_ON_NULL) + .withDescription("Keeps NULLs") + .withSource( + ROW(STRING()), + Arrays.asList( + Row.ofKind(INSERT, "A"), + Row.ofKind(INSERT, (String) null), + Row.ofKind(INSERT, "C"))) + .testResult( + source -> "SELECT JSON_ARRAYAGG(f0 NULL ON NULL) FROM " + source, + source -> source.select(jsonArrayAgg(JsonOnNull.NULL, $("f0"))), + ROW(VARCHAR(2000).notNull()), + ROW(STRING().notNull()), + Collections.singletonList(Row.of("[\"A\",null,\"C\"]"))), + TestSpec.forFunction(BuiltInFunctionDefinitions.JSON_ARRAYAGG_ABSENT_ON_NULL) + .withDescription("Retractions") + .withSource( + ROW(INT()), + Arrays.asList( + Row.ofKind(INSERT, 1), + Row.ofKind(INSERT, 2), + Row.ofKind(INSERT, 3), + Row.ofKind(DELETE, 2))) + .testResult( + source -> "SELECT JSON_ARRAYAGG(f0) FROM " + source, + source -> source.select(jsonArrayAgg(JsonOnNull.ABSENT, $("f0"))), + ROW(VARCHAR(2000).notNull()), + ROW(STRING().notNull()), + Collections.singletonList(Row.of("[1,3]")))); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java index f0bb715..5836822 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.java @@ -69,4 +69,28 @@ public class WrapJsonAggFunctionArgumentsRuleTest extends TableTestBase { util.tableEnv().createTable("T", sourceDescriptor); util.verifyRelPlan("SELECT f0, JSON_OBJECTAGG(f1 VALUE f0) FROM T GROUP BY f0"); } + + @Test + public void testJsonArrayAgg() { + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(Schema.newBuilder().column("f0", STRING()).build()) + .unboundedScanSource(ChangelogMode.all()) + .build(); + + util.tableEnv().createTable("T", sourceDescriptor); + util.verifyRelPlan("SELECT JSON_ARRAYAGG(f0) FROM T"); + } + + @Test + public void testJsonArrayAggInGroupWindow() { + final TableDescriptor sourceDescriptor = + TableFactoryHarness.newBuilder() + .schema(Schema.newBuilder().column("f0", INT()).build()) + .unboundedScanSource() + .build(); + + util.tableEnv().createTable("T", sourceDescriptor); + util.verifyRelPlan("SELECT f0, JSON_ARRAYAGG(f0) FROM T GROUP BY f0"); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml index bf7ffcb..2b0b718 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/WrapJsonAggFunctionArgumentsRuleTest.xml @@ -16,6 +16,44 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> + <TestCase name="testJsonArrayAgg"> + <Resource name="sql"> + <![CDATA[SELECT JSON_ARRAYAGG(f0) FROM T]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{}], EXPR$0=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)]) ++- LogicalTableScan(table=[[default_catalog, default_database, T]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +GroupAggregate(select=[JSON_ARRAYAGG_ABSENT_ON_NULL_RETRACT($f1) AS EXPR$0]) ++- Exchange(distribution=[single]) + +- Calc(select=[f0, JSON_STRING(f0) AS $f1]) + +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0]) +]]> + </Resource> + </TestCase> + <TestCase name="testJsonArrayAggInGroupWindow"> + <Resource name="sql"> + <![CDATA[SELECT f0, JSON_ARRAYAGG(f0) FROM T GROUP BY f0]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[JSON_ARRAYAGG_ABSENT_ON_NULL($0)]) ++- LogicalTableScan(table=[[default_catalog, default_database, T]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +GroupAggregate(groupBy=[f0], select=[f0, JSON_ARRAYAGG_ABSENT_ON_NULL($f1) AS EXPR$1]) ++- Exchange(distribution=[hash[f0]]) + +- Calc(select=[f0, JSON_STRING(f0) AS $f1]) + +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0]) +]]> + </Resource> + </TestCase> <TestCase name="testJsonObjectAggInGroupWindow"> <Resource name="sql"> <![CDATA[SELECT f0, JSON_OBJECTAGG(f1 VALUE f0) FROM T GROUP BY f0]]>