This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 74c7188ae98 [FLINK-29722][hive] Supports native hive max function for hive dialect 74c7188ae98 is described below commit 74c7188ae9898b492c94a472d9d407bf4f8e0876 Author: fengli <ldliu...@163.com> AuthorDate: Thu Jan 5 21:04:33 2023 +0800 [FLINK-29722][hive] Supports native hive max function for hive dialect This closes #21605 --- .../hive/HiveDeclarativeAggregateFunction.java | 29 +++++ ...MinAggFunction.java => HiveMaxAggFunction.java} | 59 ++++------ .../table/functions/hive/HiveMinAggFunction.java | 19 +--- .../table/functions/hive/HiveSumAggFunction.java | 4 +- .../apache/flink/table/module/hive/HiveModule.java | 6 +- .../connectors/hive/HiveDialectAggITCase.java | 126 ++++++++++++++++++--- .../connectors/hive/HiveDialectQueryPlanTest.java | 14 +++ .../explain/testMaxAggFunctionFallbackPlan.out | 21 ++++ .../resources/explain/testMaxAggFunctionPlan.out | 17 +++ 9 files changed, 227 insertions(+), 68 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java index 5d316e2e97d..e184207d007 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java @@ -19,6 +19,7 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.DeclarativeAggregateFunction; import org.apache.flink.table.functions.FunctionKind; @@ -26,9 +27,14 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import java.util.List; import java.util.Optional; +import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED; + /** * API for hive aggregation functions that are expressed in terms of expressions. * @@ -57,6 +63,29 @@ public abstract class HiveDeclarativeAggregateFunction extends DeclarativeAggreg .build(); } + protected void checkArgumentNum(List<DataType> arguments) { + if (arguments.size() != 1) { + throw new TableException("Exactly one argument is expected."); + } + } + + protected void checkMinMaxArgumentType(LogicalType logicalType, String functionName) { + // Flink doesn't support to compare nested type now, so here can't support it, see + // ScalarOperatorGens#generateComparison for more detail + if (logicalType.is(LogicalTypeRoot.ARRAY) + || logicalType.is(LogicalTypeRoot.MAP) + || logicalType.is(LogicalTypeRoot.ROW)) { + throw new TableException( + String.format( + "Native hive %s aggregate function does not support type: %s. " + + "Please set option '%s' to false to fall back to Hive's own %s function.", + functionName, + logicalType.getTypeRoot(), + TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED.key(), + functionName)); + } + } + @Override public FunctionKind getKind() { return FunctionKind.AGGREGATE; diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java similarity index 60% copy from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java index 2eade5a83e6..1e5ddc62e97 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java @@ -23,19 +23,17 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.UnresolvedReferenceExpression; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; +import static org.apache.flink.table.planner.expressions.ExpressionBuilder.greaterThan; import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; import static org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull; -import static org.apache.flink.table.planner.expressions.ExpressionBuilder.lessThan; import static org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf; -/** built-in hive min aggregate function. */ -public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction { +/** built-in hive max aggregate function. */ +public class HiveMaxAggFunction extends HiveDeclarativeAggregateFunction { - private final UnresolvedReferenceExpression min = unresolvedRef("min"); + private final UnresolvedReferenceExpression max = unresolvedRef("max"); private DataType resultType; @Override @@ -45,7 +43,7 @@ public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction { @Override public UnresolvedReferenceExpression[] aggBufferAttributes() { - return new UnresolvedReferenceExpression[] {min}; + return new UnresolvedReferenceExpression[] {max}; } @Override @@ -60,67 +58,54 @@ public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction { @Override public Expression[] initialValuesExpressions() { - return new Expression[] { - /* min */ - nullOf(getResultType()) - }; + return new Expression[] {/* max = */ nullOf(getResultType())}; } @Override public Expression[] accumulateExpressions() { return new Expression[] { - /* min = */ ifThenElse( + /* max = */ ifThenElse( isNull(operand(0)), - min, + max, ifThenElse( - isNull(min), + isNull(max), operand(0), - ifThenElse(lessThan(operand(0), min), operand(0), min))) + ifThenElse(greaterThan(operand(0), max), operand(0), max))) }; } @Override public Expression[] retractExpressions() { - throw new TableException("Min aggregate function does not support retraction."); + throw new TableException("Max aggregate function does not support retraction."); } @Override public Expression[] mergeExpressions() { return new Expression[] { - /* min = */ ifThenElse( - isNull(mergeOperand(min)), - min, + /* max = */ ifThenElse( + isNull(mergeOperand(max)), + max, ifThenElse( - isNull(min), - mergeOperand(min), - ifThenElse(lessThan(mergeOperand(min), min), mergeOperand(min), min))) + isNull(max), + mergeOperand(max), + ifThenElse( + greaterThan(mergeOperand(max), max), mergeOperand(max), max))) }; } @Override public Expression getValueExpression() { - return min; + return max; } @Override public void setArguments(CallContext callContext) { if (resultType == null) { + checkArgumentNum(callContext.getArgumentDataTypes()); // check argument type firstly - checkArgumentType(callContext.getArgumentDataTypes().get(0).getLogicalType()); + checkMinMaxArgumentType( + callContext.getArgumentDataTypes().get(0).getLogicalType(), "max"); resultType = callContext.getArgumentDataTypes().get(0); } } - - private void checkArgumentType(LogicalType logicalType) { - // Flink doesn't support to compare nested type now, so here can't support it, see - // ScalarOperatorGens#generateComparison for more detail - if (logicalType.is(LogicalTypeRoot.ARRAY) - || logicalType.is(LogicalTypeRoot.MAP) - || logicalType.is(LogicalTypeRoot.ROW)) { - throw new TableException( - String.format( - "Hive native min aggregate function does not support type: '%s' now. Please re-check the data type.", - logicalType.getTypeRoot())); - } - } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java index 2eade5a83e6..9dab1334f88 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java @@ -23,8 +23,6 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.UnresolvedReferenceExpression; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef; import static org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse; @@ -105,22 +103,11 @@ public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction { @Override public void setArguments(CallContext callContext) { if (resultType == null) { + checkArgumentNum(callContext.getArgumentDataTypes()); // check argument type firstly - checkArgumentType(callContext.getArgumentDataTypes().get(0).getLogicalType()); + checkMinMaxArgumentType( + callContext.getArgumentDataTypes().get(0).getLogicalType(), "min"); resultType = callContext.getArgumentDataTypes().get(0); } } - - private void checkArgumentType(LogicalType logicalType) { - // Flink doesn't support to compare nested type now, so here can't support it, see - // ScalarOperatorGens#generateComparison for more detail - if (logicalType.is(LogicalTypeRoot.ARRAY) - || logicalType.is(LogicalTypeRoot.MAP) - || logicalType.is(LogicalTypeRoot.ROW)) { - throw new TableException( - String.format( - "Hive native min aggregate function does not support type: '%s' now. Please re-check the data type.", - logicalType.getTypeRoot())); - } - } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java index 713cd72e13a..e19674ed39f 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java @@ -106,6 +106,7 @@ public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { @Override public void setArguments(CallContext callContext) { if (resultType == null) { + checkArgumentNum(callContext.getArgumentDataTypes()); resultType = initResultType(callContext.getArgumentDataTypes().get(0)); } } @@ -129,7 +130,8 @@ public class HiveSumAggFunction extends HiveDeclarativeAggregateFunction { case TIMESTAMP_WITHOUT_TIME_ZONE: throw new TableException( String.format( - "Native hive sum aggregate function does not support type: %s. Please set option '%s' to false.", + "Native hive sum aggregate function does not support type: %s. " + + "Please set option '%s' to false to fall back to Hive's own sum function.", argsType, TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED.key())); default: throw new TableException( diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java index bb598891fef..d2b397ff416 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java @@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFacto import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.hive.HiveCountAggFunction; +import org.apache.flink.table.functions.hive.HiveMaxAggFunction; import org.apache.flink.table.functions.hive.HiveMinAggFunction; import org.apache.flink.table.functions.hive.HiveSumAggFunction; import org.apache.flink.table.module.Module; @@ -87,7 +88,7 @@ public class HiveModule implements Module { "tumble_start"))); static final Set<String> BUILTIN_NATIVE_AGG_FUNC = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList("sum", "count", "min"))); + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("sum", "count", "min", "max"))); private final HiveFunctionDefinitionFactory factory; private final String hiveVersion; @@ -213,6 +214,9 @@ public class HiveModule implements Module { case "min": // We override Hive's min function by native implementation to supports hash-agg return Optional.of(new HiveMinAggFunction()); + case "max": + // We override Hive's max function by native implementation to supports hash-agg + return Optional.of(new HiveMaxAggFunction()); default: throw new UnsupportedOperationException( String.format( diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java index 3af77ad72b6..94aacfe0e50 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java @@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder; import java.util.List; import static org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED; -import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -128,16 +127,10 @@ public class HiveDialectAggITCase { assertThat(result7.toString()).isEqualTo("[+I[6.0, 10]]"); // test unsupported timestamp type - assertThatThrownBy( - () -> - CollectionUtil.iteratorToList( - tableEnv.executeSql("select sum(ts) from test_sum") - .collect())) - .rootCause() - .satisfiesAnyOf( - anyCauseMatches( - "Native hive sum aggregate function does not support type: TIMESTAMP(9). " - + "Please set option 'table.exec.hive.native-agg-function.enabled' to false.")); + String expectedMessage = + "Native hive sum aggregate function does not support type: TIMESTAMP(9). " + + "Please set option 'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's own sum function."; + assertSqlException("select sum(ts) from test_sum", TableException.class, expectedMessage); tableEnv.executeSql("drop table test_sum"); } @@ -303,7 +296,8 @@ public class HiveDialectAggITCase { "create table test_min_not_support_type(a array<int>,m map<int, string>,s struct<f1:int,f2:string>)"); // test min with row type String expectedRowMessage = - "Hive native min aggregate function does not support type: 'ROW' now. Please re-check the data type."; + "Native hive min aggregate function does not support type: ROW. " + + "Please set option 'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's own min function."; assertSqlException( "select min(s) from test_min_not_support_type", TableException.class, @@ -311,7 +305,8 @@ public class HiveDialectAggITCase { // test min with array type String expectedArrayMessage = - "Hive native min aggregate function does not support type: 'ARRAY' now. Please re-check the data type."; + "Native hive min aggregate function does not support type: ARRAY. " + + "Please set option 'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's own min function."; assertSqlException( "select min(a) from test_min_not_support_type", TableException.class, @@ -328,6 +323,111 @@ public class HiveDialectAggITCase { tableEnv.executeSql("drop table test_min_not_support_type"); } + @Test + public void testMaxAggFunction() throws Exception { + tableEnv.executeSql( + "create table test_max(a int, b boolean, x string, y string, z int, d decimal(10,5), e float, f double, ts timestamp, dt date, bar binary)"); + tableEnv.executeSql( + "insert into test_max values (1, true, NULL, '2', 1, 1.11, 1.2, 1.3, '2021-08-04 16:26:33.4','2021-08-04', 'data1'), " + + "(1, false, NULL, 'b', 2, 2.22, 2.3, 2.4, '2021-08-06 16:26:33.4','2021-08-07', 'data2'), " + + "(2, false, NULL, '4', 1, 3.33, 3.5, 3.6, '2021-08-08 16:26:33.4','2021-08-08', 'data3'), " + + "(2, true, NULL, NULL, 4, 4.45, 4.7, 4.8, '2021-08-10 16:26:33.4','2021-08-01', 'data4')") + .await(); + + // test max with all elements are null + List<Row> result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(x) from test_max").collect()); + assertThat(result.toString()).isEqualTo("[+I[null]]"); + + // test max with some elements are null + List<Row> result2 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(y) from test_max").collect()); + assertThat(result2.toString()).isEqualTo("[+I[b]]"); + + // test max with some elements repeated + List<Row> result3 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(z) from test_max").collect()); + assertThat(result3.toString()).isEqualTo("[+I[4]]"); + + // test max with decimal type + List<Row> result4 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(d) from test_max").collect()); + assertThat(result4.toString()).isEqualTo("[+I[4.45000]]"); + + // test max with float type + List<Row> result5 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(e) from test_max").collect()); + assertThat(result5.toString()).isEqualTo("[+I[4.7]]"); + + // test max with double type + List<Row> result6 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(f) from test_max").collect()); + assertThat(result6.toString()).isEqualTo("[+I[4.8]]"); + + // test max with boolean type + List<Row> result7 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(b) from test_max").collect()); + assertThat(result7.toString()).isEqualTo("[+I[true]]"); + + // test max with timestamp type + List<Row> result8 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(ts) from test_max").collect()); + assertThat(result8.toString()).isEqualTo("[+I[2021-08-10T16:26:33.400]]"); + + // test max with date type + List<Row> result9 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(dt) from test_max").collect()); + assertThat(result9.toString()).isEqualTo("[+I[2021-08-08]]"); + + // test max with binary type + List<Row> result10 = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select max(bar) from test_max").collect()); + assertThat(result10.toString()).isEqualTo("[+I[[100, 97, 116, 97, 52]]]"); + + tableEnv.executeSql("drop table test_max"); + + // test max with unsupported data type + tableEnv.executeSql( + "create table test_max_not_support_type(a array<int>,m map<int, string>,s struct<f1:int,f2:string>)"); + // test max with row type + String expectedRowMessage = + "Native hive max aggregate function does not support type: ROW. " + + "Please set option 'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's own max function."; + assertSqlException( + "select max(s) from test_max_not_support_type", + TableException.class, + expectedRowMessage); + + // test max with array type + String expectedArrayMessage = + "Native hive max aggregate function does not support type: ARRAY. " + + "Please set option 'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's own max function."; + assertSqlException( + "select max(a) from test_max_not_support_type", + TableException.class, + expectedArrayMessage); + + // test max with map type, hive also does not support map type comparisons. + String expectedMapMessage = + "Cannot support comparison of map<> type or complex type containing map<>."; + assertSqlException( + "select max(m) from test_max_not_support_type", + UDFArgumentTypeException.class, + expectedMapMessage); + + tableEnv.executeSql("drop table test_max_not_support_type"); + } + private void assertSqlException( String sql, Class<?> expectedExceptionClz, String expectedMessage) { assertThatThrownBy(() -> tableEnv.executeSql(sql)) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java index 48cc8b913d1..a681d004f93 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java @@ -109,6 +109,20 @@ public class HiveDialectQueryPlanTest { .isEqualTo(readFromResource("/explain/testMinAggFunctionFallbackPlan.out")); } + @Test + public void testMaxAggFunctionPlan() { + // test explain + String sql = "select x, max(y) from foo group by x"; + String actualPlan = explainSql(sql); + assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMaxAggFunctionPlan.out")); + + // test fallback to hive max udaf + tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED, false); + String actualSortAggPlan = explainSql(sql); + assertThat(actualSortAggPlan) + .isEqualTo(readFromResource("/explain/testMaxAggFunctionFallbackPlan.out")); + } + private String explainSql(String sql) { return (String) CollectionUtil.iteratorToList(tableEnv.executeSql("explain " + sql).collect()) diff --git a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out new file mode 100644 index 00000000000..e25fa76f0e0 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out @@ -0,0 +1,21 @@ +== Abstract Syntax Tree == +LogicalProject(x=[$0], _o__c1=[$1]) ++- LogicalAggregate(group=[{0}], agg#0=[max($1)]) + +- LogicalProject($f0=[$0], $f1=[$1]) + +- LogicalTableScan(table=[[test-catalog, default, foo]]) + +== Optimized Physical Plan == +SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max($f1) AS $f1]) ++- Sort(orderBy=[x ASC]) + +- Exchange(distribution=[hash[x]]) + +- LocalSortAggregate(groupBy=[x], select=[x, Partial_max(y) AS $f1]) + +- Sort(orderBy=[x ASC]) + +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y]) + +== Optimized Execution Plan == +SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max($f1) AS $f1]) ++- Sort(orderBy=[x ASC]) + +- Exchange(distribution=[hash[x]]) + +- LocalSortAggregate(groupBy=[x], select=[x, Partial_max(y) AS $f1]) + +- Sort(orderBy=[x ASC]) + +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y]) diff --git a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out new file mode 100644 index 00000000000..5beaeb74681 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out @@ -0,0 +1,17 @@ +== Abstract Syntax Tree == +LogicalProject(x=[$0], _o__c1=[$1]) ++- LogicalAggregate(group=[{0}], agg#0=[max($1)]) + +- LogicalProject($f0=[$0], $f1=[$1]) + +- LogicalTableScan(table=[[test-catalog, default, foo]]) + +== Optimized Physical Plan == +HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max(max$0) AS $f1]) ++- Exchange(distribution=[hash[x]]) + +- LocalHashAggregate(groupBy=[x], select=[x, Partial_max(y) AS max$0]) + +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y]) + +== Optimized Execution Plan == +HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max(max$0) AS $f1]) ++- Exchange(distribution=[hash[x]]) + +- LocalHashAggregate(groupBy=[x], select=[x, Partial_max(y) AS max$0]) + +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])