This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 74c7188ae98 [FLINK-29722][hive] Supports native hive max function for 
hive dialect
74c7188ae98 is described below

commit 74c7188ae9898b492c94a472d9d407bf4f8e0876
Author: fengli <ldliu...@163.com>
AuthorDate: Thu Jan 5 21:04:33 2023 +0800

    [FLINK-29722][hive] Supports native hive max function for hive dialect
    
    This closes #21605
---
 .../hive/HiveDeclarativeAggregateFunction.java     |  29 +++++
 ...MinAggFunction.java => HiveMaxAggFunction.java} |  59 ++++------
 .../table/functions/hive/HiveMinAggFunction.java   |  19 +---
 .../table/functions/hive/HiveSumAggFunction.java   |   4 +-
 .../apache/flink/table/module/hive/HiveModule.java |   6 +-
 .../connectors/hive/HiveDialectAggITCase.java      | 126 ++++++++++++++++++---
 .../connectors/hive/HiveDialectQueryPlanTest.java  |  14 +++
 .../explain/testMaxAggFunctionFallbackPlan.out     |  21 ++++
 .../resources/explain/testMaxAggFunctionPlan.out   |  17 +++
 9 files changed, 227 insertions(+), 68 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java
index 5d316e2e97d..e184207d007 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveDeclarativeAggregateFunction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.DeclarativeAggregateFunction;
 import org.apache.flink.table.functions.FunctionKind;
@@ -26,9 +27,14 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
+import java.util.List;
 import java.util.Optional;
 
+import static 
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
+
 /**
  * API for hive aggregation functions that are expressed in terms of 
expressions.
  *
@@ -57,6 +63,29 @@ public abstract class HiveDeclarativeAggregateFunction 
extends DeclarativeAggreg
                 .build();
     }
 
+    protected void checkArgumentNum(List<DataType> arguments) {
+        if (arguments.size() != 1) {
+            throw new TableException("Exactly one argument is expected.");
+        }
+    }
+
+    protected void checkMinMaxArgumentType(LogicalType logicalType, String 
functionName) {
+        // Flink doesn't support to compare nested type now, so here can't 
support it, see
+        // ScalarOperatorGens#generateComparison for more detail
+        if (logicalType.is(LogicalTypeRoot.ARRAY)
+                || logicalType.is(LogicalTypeRoot.MAP)
+                || logicalType.is(LogicalTypeRoot.ROW)) {
+            throw new TableException(
+                    String.format(
+                            "Native hive %s aggregate function does not 
support type: %s. "
+                                    + "Please set option '%s' to false to fall 
back to Hive's own %s function.",
+                            functionName,
+                            logicalType.getTypeRoot(),
+                            TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED.key(),
+                            functionName));
+        }
+    }
+
     @Override
     public FunctionKind getKind() {
         return FunctionKind.AGGREGATE;
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java
similarity index 60%
copy from 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
copy to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java
index 2eade5a83e6..1e5ddc62e97 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMaxAggFunction.java
@@ -23,19 +23,17 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.greaterThan;
 import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
 import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
-import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.lessThan;
 import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
 
-/** built-in hive min aggregate function. */
-public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction {
+/** built-in hive max aggregate function. */
+public class HiveMaxAggFunction extends HiveDeclarativeAggregateFunction {
 
-    private final UnresolvedReferenceExpression min = unresolvedRef("min");
+    private final UnresolvedReferenceExpression max = unresolvedRef("max");
     private DataType resultType;
 
     @Override
@@ -45,7 +43,7 @@ public class HiveMinAggFunction extends 
HiveDeclarativeAggregateFunction {
 
     @Override
     public UnresolvedReferenceExpression[] aggBufferAttributes() {
-        return new UnresolvedReferenceExpression[] {min};
+        return new UnresolvedReferenceExpression[] {max};
     }
 
     @Override
@@ -60,67 +58,54 @@ public class HiveMinAggFunction extends 
HiveDeclarativeAggregateFunction {
 
     @Override
     public Expression[] initialValuesExpressions() {
-        return new Expression[] {
-            /* min */
-            nullOf(getResultType())
-        };
+        return new Expression[] {/* max = */ nullOf(getResultType())};
     }
 
     @Override
     public Expression[] accumulateExpressions() {
         return new Expression[] {
-            /* min = */ ifThenElse(
+            /* max = */ ifThenElse(
                     isNull(operand(0)),
-                    min,
+                    max,
                     ifThenElse(
-                            isNull(min),
+                            isNull(max),
                             operand(0),
-                            ifThenElse(lessThan(operand(0), min), operand(0), 
min)))
+                            ifThenElse(greaterThan(operand(0), max), 
operand(0), max)))
         };
     }
 
     @Override
     public Expression[] retractExpressions() {
-        throw new TableException("Min aggregate function does not support 
retraction.");
+        throw new TableException("Max aggregate function does not support 
retraction.");
     }
 
     @Override
     public Expression[] mergeExpressions() {
         return new Expression[] {
-            /* min = */ ifThenElse(
-                    isNull(mergeOperand(min)),
-                    min,
+            /* max = */ ifThenElse(
+                    isNull(mergeOperand(max)),
+                    max,
                     ifThenElse(
-                            isNull(min),
-                            mergeOperand(min),
-                            ifThenElse(lessThan(mergeOperand(min), min), 
mergeOperand(min), min)))
+                            isNull(max),
+                            mergeOperand(max),
+                            ifThenElse(
+                                    greaterThan(mergeOperand(max), max), 
mergeOperand(max), max)))
         };
     }
 
     @Override
     public Expression getValueExpression() {
-        return min;
+        return max;
     }
 
     @Override
     public void setArguments(CallContext callContext) {
         if (resultType == null) {
+            checkArgumentNum(callContext.getArgumentDataTypes());
             // check argument type firstly
-            
checkArgumentType(callContext.getArgumentDataTypes().get(0).getLogicalType());
+            checkMinMaxArgumentType(
+                    
callContext.getArgumentDataTypes().get(0).getLogicalType(), "max");
             resultType = callContext.getArgumentDataTypes().get(0);
         }
     }
-
-    private void checkArgumentType(LogicalType logicalType) {
-        // Flink doesn't support to compare nested type now, so here can't 
support it, see
-        // ScalarOperatorGens#generateComparison for more detail
-        if (logicalType.is(LogicalTypeRoot.ARRAY)
-                || logicalType.is(LogicalTypeRoot.MAP)
-                || logicalType.is(LogicalTypeRoot.ROW)) {
-            throw new TableException(
-                    String.format(
-                            "Hive native min aggregate function does not 
support type: '%s' now. Please re-check the data type.",
-                            logicalType.getTypeRoot()));
-        }
-    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
index 2eade5a83e6..9dab1334f88 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
@@ -23,8 +23,6 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
 import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
@@ -105,22 +103,11 @@ public class HiveMinAggFunction extends 
HiveDeclarativeAggregateFunction {
     @Override
     public void setArguments(CallContext callContext) {
         if (resultType == null) {
+            checkArgumentNum(callContext.getArgumentDataTypes());
             // check argument type firstly
-            
checkArgumentType(callContext.getArgumentDataTypes().get(0).getLogicalType());
+            checkMinMaxArgumentType(
+                    
callContext.getArgumentDataTypes().get(0).getLogicalType(), "min");
             resultType = callContext.getArgumentDataTypes().get(0);
         }
     }
-
-    private void checkArgumentType(LogicalType logicalType) {
-        // Flink doesn't support to compare nested type now, so here can't 
support it, see
-        // ScalarOperatorGens#generateComparison for more detail
-        if (logicalType.is(LogicalTypeRoot.ARRAY)
-                || logicalType.is(LogicalTypeRoot.MAP)
-                || logicalType.is(LogicalTypeRoot.ROW)) {
-            throw new TableException(
-                    String.format(
-                            "Hive native min aggregate function does not 
support type: '%s' now. Please re-check the data type.",
-                            logicalType.getTypeRoot()));
-        }
-    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java
index 713cd72e13a..e19674ed39f 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSumAggFunction.java
@@ -106,6 +106,7 @@ public class HiveSumAggFunction extends 
HiveDeclarativeAggregateFunction {
     @Override
     public void setArguments(CallContext callContext) {
         if (resultType == null) {
+            checkArgumentNum(callContext.getArgumentDataTypes());
             resultType = 
initResultType(callContext.getArgumentDataTypes().get(0));
         }
     }
@@ -129,7 +130,8 @@ public class HiveSumAggFunction extends 
HiveDeclarativeAggregateFunction {
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 throw new TableException(
                         String.format(
-                                "Native hive sum aggregate function does not 
support type: %s. Please set option '%s' to false.",
+                                "Native hive sum aggregate function does not 
support type: %s. "
+                                        + "Please set option '%s' to false to 
fall back to Hive's own sum function.",
                                 argsType, 
TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED.key()));
             default:
                 throw new TableException(
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index bb598891fef..d2b397ff416 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFacto
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.hive.HiveCountAggFunction;
+import org.apache.flink.table.functions.hive.HiveMaxAggFunction;
 import org.apache.flink.table.functions.hive.HiveMinAggFunction;
 import org.apache.flink.table.functions.hive.HiveSumAggFunction;
 import org.apache.flink.table.module.Module;
@@ -87,7 +88,7 @@ public class HiveModule implements Module {
                                     "tumble_start")));
 
     static final Set<String> BUILTIN_NATIVE_AGG_FUNC =
-            Collections.unmodifiableSet(new HashSet<>(Arrays.asList("sum", 
"count", "min")));
+            Collections.unmodifiableSet(new HashSet<>(Arrays.asList("sum", 
"count", "min", "max")));
 
     private final HiveFunctionDefinitionFactory factory;
     private final String hiveVersion;
@@ -213,6 +214,9 @@ public class HiveModule implements Module {
             case "min":
                 // We override Hive's min function by native implementation to 
supports hash-agg
                 return Optional.of(new HiveMinAggFunction());
+            case "max":
+                // We override Hive's max function by native implementation to 
supports hash-agg
+                return Optional.of(new HiveMaxAggFunction());
             default:
                 throw new UnsupportedOperationException(
                         String.format(
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
index 3af77ad72b6..94aacfe0e50 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
@@ -38,7 +38,6 @@ import org.junit.rules.TemporaryFolder;
 import java.util.List;
 
 import static 
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
-import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -128,16 +127,10 @@ public class HiveDialectAggITCase {
         assertThat(result7.toString()).isEqualTo("[+I[6.0, 10]]");
 
         // test unsupported timestamp type
-        assertThatThrownBy(
-                        () ->
-                                CollectionUtil.iteratorToList(
-                                        tableEnv.executeSql("select sum(ts) 
from test_sum")
-                                                .collect()))
-                .rootCause()
-                .satisfiesAnyOf(
-                        anyCauseMatches(
-                                "Native hive sum aggregate function does not 
support type: TIMESTAMP(9). "
-                                        + "Please set option 
'table.exec.hive.native-agg-function.enabled' to false."));
+        String expectedMessage =
+                "Native hive sum aggregate function does not support type: 
TIMESTAMP(9). "
+                        + "Please set option 
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's 
own sum function.";
+        assertSqlException("select sum(ts) from test_sum", 
TableException.class, expectedMessage);
 
         tableEnv.executeSql("drop table test_sum");
     }
@@ -303,7 +296,8 @@ public class HiveDialectAggITCase {
                 "create table test_min_not_support_type(a array<int>,m 
map<int, string>,s struct<f1:int,f2:string>)");
         // test min with row type
         String expectedRowMessage =
-                "Hive native min aggregate function does not support type: 
'ROW' now. Please re-check the data type.";
+                "Native hive min aggregate function does not support type: 
ROW. "
+                        + "Please set option 
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's 
own min function.";
         assertSqlException(
                 "select min(s) from test_min_not_support_type",
                 TableException.class,
@@ -311,7 +305,8 @@ public class HiveDialectAggITCase {
 
         // test min with array type
         String expectedArrayMessage =
-                "Hive native min aggregate function does not support type: 
'ARRAY' now. Please re-check the data type.";
+                "Native hive min aggregate function does not support type: 
ARRAY. "
+                        + "Please set option 
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's 
own min function.";
         assertSqlException(
                 "select min(a) from test_min_not_support_type",
                 TableException.class,
@@ -328,6 +323,111 @@ public class HiveDialectAggITCase {
         tableEnv.executeSql("drop table test_min_not_support_type");
     }
 
+    @Test
+    public void testMaxAggFunction() throws Exception {
+        tableEnv.executeSql(
+                "create table test_max(a int, b boolean, x string, y string, z 
int, d decimal(10,5), e float, f double, ts timestamp, dt date, bar binary)");
+        tableEnv.executeSql(
+                        "insert into test_max values (1, true, NULL, '2', 1, 
1.11, 1.2, 1.3, '2021-08-04 16:26:33.4','2021-08-04', 'data1'), "
+                                + "(1, false, NULL, 'b', 2, 2.22, 2.3, 2.4, 
'2021-08-06 16:26:33.4','2021-08-07', 'data2'), "
+                                + "(2, false, NULL, '4', 1, 3.33, 3.5, 3.6, 
'2021-08-08 16:26:33.4','2021-08-08', 'data3'), "
+                                + "(2, true, NULL, NULL, 4, 4.45, 4.7, 4.8, 
'2021-08-10 16:26:33.4','2021-08-01', 'data4')")
+                .await();
+
+        // test max with all elements are null
+        List<Row> result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(x) from 
test_max").collect());
+        assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+        // test max with some elements are null
+        List<Row> result2 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(y) from 
test_max").collect());
+        assertThat(result2.toString()).isEqualTo("[+I[b]]");
+
+        // test max with some elements repeated
+        List<Row> result3 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(z) from 
test_max").collect());
+        assertThat(result3.toString()).isEqualTo("[+I[4]]");
+
+        // test max with decimal type
+        List<Row> result4 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(d) from 
test_max").collect());
+        assertThat(result4.toString()).isEqualTo("[+I[4.45000]]");
+
+        // test max with float type
+        List<Row> result5 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(e) from 
test_max").collect());
+        assertThat(result5.toString()).isEqualTo("[+I[4.7]]");
+
+        // test max with double type
+        List<Row> result6 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(f) from 
test_max").collect());
+        assertThat(result6.toString()).isEqualTo("[+I[4.8]]");
+
+        // test max with boolean type
+        List<Row> result7 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(b) from 
test_max").collect());
+        assertThat(result7.toString()).isEqualTo("[+I[true]]");
+
+        // test max with timestamp type
+        List<Row> result8 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(ts) from 
test_max").collect());
+        
assertThat(result8.toString()).isEqualTo("[+I[2021-08-10T16:26:33.400]]");
+
+        // test max with date type
+        List<Row> result9 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(dt) from 
test_max").collect());
+        assertThat(result9.toString()).isEqualTo("[+I[2021-08-08]]");
+
+        // test max with binary type
+        List<Row> result10 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select max(bar) from 
test_max").collect());
+        assertThat(result10.toString()).isEqualTo("[+I[[100, 97, 116, 97, 
52]]]");
+
+        tableEnv.executeSql("drop table test_max");
+
+        // test max with unsupported data type
+        tableEnv.executeSql(
+                "create table test_max_not_support_type(a array<int>,m 
map<int, string>,s struct<f1:int,f2:string>)");
+        // test max with row type
+        String expectedRowMessage =
+                "Native hive max aggregate function does not support type: 
ROW. "
+                        + "Please set option 
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's 
own max function.";
+        assertSqlException(
+                "select max(s) from test_max_not_support_type",
+                TableException.class,
+                expectedRowMessage);
+
+        // test max with array type
+        String expectedArrayMessage =
+                "Native hive max aggregate function does not support type: 
ARRAY. "
+                        + "Please set option 
'table.exec.hive.native-agg-function.enabled' to false to fall back to Hive's 
own max function.";
+        assertSqlException(
+                "select max(a) from test_max_not_support_type",
+                TableException.class,
+                expectedArrayMessage);
+
+        // test max with map type, hive also does not support map type 
comparisons.
+        String expectedMapMessage =
+                "Cannot support comparison of map<> type or complex type 
containing map<>.";
+        assertSqlException(
+                "select max(m) from test_max_not_support_type",
+                UDFArgumentTypeException.class,
+                expectedMapMessage);
+
+        tableEnv.executeSql("drop table test_max_not_support_type");
+    }
+
     private void assertSqlException(
             String sql, Class<?> expectedExceptionClz, String expectedMessage) 
{
         assertThatThrownBy(() -> tableEnv.executeSql(sql))
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
index 48cc8b913d1..a681d004f93 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
@@ -109,6 +109,20 @@ public class HiveDialectQueryPlanTest {
                 
.isEqualTo(readFromResource("/explain/testMinAggFunctionFallbackPlan.out"));
     }
 
+    @Test
+    public void testMaxAggFunctionPlan() {
+        // test explain
+        String sql = "select x, max(y) from foo group by x";
+        String actualPlan = explainSql(sql);
+        
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMaxAggFunctionPlan.out"));
+
+        // test fallback to hive max udaf
+        tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED, 
false);
+        String actualSortAggPlan = explainSql(sql);
+        assertThat(actualSortAggPlan)
+                
.isEqualTo(readFromResource("/explain/testMaxAggFunctionFallbackPlan.out"));
+    }
+
     private String explainSql(String sql) {
         return (String)
                 CollectionUtil.iteratorToList(tableEnv.executeSql("explain " + 
sql).collect())
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out
 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out
new file mode 100644
index 00000000000..e25fa76f0e0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionFallbackPlan.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(x=[$0], _o__c1=[$1])
++- LogicalAggregate(group=[{0}], agg#0=[max($1)])
+   +- LogicalProject($f0=[$0], $f1=[$1])
+      +- LogicalTableScan(table=[[test-catalog, default, foo]])
+
+== Optimized Physical Plan ==
+SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max($f1) AS $f1])
++- Sort(orderBy=[x ASC])
+   +- Exchange(distribution=[hash[x]])
+      +- LocalSortAggregate(groupBy=[x], select=[x, Partial_max(y) AS $f1])
+         +- Sort(orderBy=[x ASC])
+            +- TableSourceScan(table=[[test-catalog, default, foo]], 
fields=[x, y])
+
+== Optimized Execution Plan ==
+SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max($f1) AS $f1])
++- Sort(orderBy=[x ASC])
+   +- Exchange(distribution=[hash[x]])
+      +- LocalSortAggregate(groupBy=[x], select=[x, Partial_max(y) AS $f1])
+         +- Sort(orderBy=[x ASC])
+            +- TableSourceScan(table=[[test-catalog, default, foo]], 
fields=[x, y])
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out
 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out
new file mode 100644
index 00000000000..5beaeb74681
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMaxAggFunctionPlan.out
@@ -0,0 +1,17 @@
+== Abstract Syntax Tree ==
+LogicalProject(x=[$0], _o__c1=[$1])
++- LogicalAggregate(group=[{0}], agg#0=[max($1)])
+   +- LogicalProject($f0=[$0], $f1=[$1])
+      +- LogicalTableScan(table=[[test-catalog, default, foo]])
+
+== Optimized Physical Plan ==
+HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max(max$0) AS $f1])
++- Exchange(distribution=[hash[x]])
+   +- LocalHashAggregate(groupBy=[x], select=[x, Partial_max(y) AS max$0])
+      +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])
+
+== Optimized Execution Plan ==
+HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_max(max$0) AS $f1])
++- Exchange(distribution=[hash[x]])
+   +- LocalHashAggregate(groupBy=[x], select=[x, Partial_max(y) AS max$0])
+      +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])

Reply via email to