This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a1f66ca827 [FLINK-29721][hive] Supports native hive min function for 
hive dialect
8a1f66ca827 is described below

commit 8a1f66ca827163b32387e0043f4362921f6c11a9
Author: Tartarus0zm <zhangma...@163.com>
AuthorDate: Fri Jan 6 16:09:38 2023 +0800

    [FLINK-29721][hive] Supports native hive min function for hive dialect
    
    This closes #21608
---
 .../table/functions/hive/HiveMinAggFunction.java   | 126 ++++++++++++++++++++
 .../apache/flink/table/module/hive/HiveModule.java |  25 +++-
 .../connectors/hive/HiveDialectAggITCase.java      | 132 +++++++++++++++++----
 .../connectors/hive/HiveDialectQueryPlanTest.java  | 121 +++++++++++++++++++
 .../explain/testMinAggFunctionFallbackPlan.out     |  21 ++++
 .../resources/explain/testMinAggFunctionPlan.out   |  17 +++
 6 files changed, 419 insertions(+), 23 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
new file mode 100644
index 00000000000..2eade5a83e6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveMinAggFunction.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import static 
org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.ifThenElse;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.isNull;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.lessThan;
+import static 
org.apache.flink.table.planner.expressions.ExpressionBuilder.nullOf;
+
+/** built-in hive min aggregate function. */
+public class HiveMinAggFunction extends HiveDeclarativeAggregateFunction {
+
+    private final UnresolvedReferenceExpression min = unresolvedRef("min");
+    private DataType resultType;
+
+    @Override
+    public int operandCount() {
+        return 1;
+    }
+
+    @Override
+    public UnresolvedReferenceExpression[] aggBufferAttributes() {
+        return new UnresolvedReferenceExpression[] {min};
+    }
+
+    @Override
+    public DataType[] getAggBufferTypes() {
+        return new DataType[] {getResultType()};
+    }
+
+    @Override
+    public DataType getResultType() {
+        return resultType;
+    }
+
+    @Override
+    public Expression[] initialValuesExpressions() {
+        return new Expression[] {
+            /* min */
+            nullOf(getResultType())
+        };
+    }
+
+    @Override
+    public Expression[] accumulateExpressions() {
+        return new Expression[] {
+            /* min = */ ifThenElse(
+                    isNull(operand(0)),
+                    min,
+                    ifThenElse(
+                            isNull(min),
+                            operand(0),
+                            ifThenElse(lessThan(operand(0), min), operand(0), 
min)))
+        };
+    }
+
+    @Override
+    public Expression[] retractExpressions() {
+        throw new TableException("Min aggregate function does not support 
retraction.");
+    }
+
+    @Override
+    public Expression[] mergeExpressions() {
+        return new Expression[] {
+            /* min = */ ifThenElse(
+                    isNull(mergeOperand(min)),
+                    min,
+                    ifThenElse(
+                            isNull(min),
+                            mergeOperand(min),
+                            ifThenElse(lessThan(mergeOperand(min), min), 
mergeOperand(min), min)))
+        };
+    }
+
+    @Override
+    public Expression getValueExpression() {
+        return min;
+    }
+
+    @Override
+    public void setArguments(CallContext callContext) {
+        if (resultType == null) {
+            // check argument type firstly
+            
checkArgumentType(callContext.getArgumentDataTypes().get(0).getLogicalType());
+            resultType = callContext.getArgumentDataTypes().get(0);
+        }
+    }
+
+    private void checkArgumentType(LogicalType logicalType) {
+        // Flink doesn't support to compare nested type now, so here can't 
support it, see
+        // ScalarOperatorGens#generateComparison for more detail
+        if (logicalType.is(LogicalTypeRoot.ARRAY)
+                || logicalType.is(LogicalTypeRoot.MAP)
+                || logicalType.is(LogicalTypeRoot.ROW)) {
+            throw new TableException(
+                    String.format(
+                            "Hive native min aggregate function does not 
support type: '%s' now. Please re-check the data type.",
+                            logicalType.getTypeRoot()));
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index 1982ddb79ba..6ca6ca84dd9 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import 
org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.hive.HiveMinAggFunction;
 import org.apache.flink.table.functions.hive.HiveSumAggFunction;
 import org.apache.flink.table.module.Module;
 import 
org.apache.flink.table.module.hive.udf.generic.GenericUDFLegacyGroupingID;
@@ -84,6 +85,9 @@ public class HiveModule implements Module {
                                     "tumble_rowtime",
                                     "tumble_start")));
 
+    static final Set<String> BUILTIN_NATIVE_AGG_FUNC =
+            Collections.unmodifiableSet(new HashSet<>(Arrays.asList("sum", 
"min")));
+
     private final HiveFunctionDefinitionFactory factory;
     private final String hiveVersion;
     private final HiveShim hiveShim;
@@ -141,9 +145,9 @@ public class HiveModule implements Module {
         }
         FunctionDefinitionFactory.Context context = () -> classLoader;
 
-        // We override Hive's sum function by native implementation to 
supports hash-agg
-        if (isNativeAggFunctionEnabled() && name.equalsIgnoreCase("sum")) {
-            return Optional.of(new HiveSumAggFunction());
+        // We override some Hive's function by native implementation to 
supports hash-agg
+        if (isNativeAggFunctionEnabled() && 
BUILTIN_NATIVE_AGG_FUNC.contains(name.toLowerCase())) {
+            return getBuiltInNativeAggFunction(name.toLowerCase());
         }
 
         // We override Hive's grouping function. Refer to the implementation 
for more details.
@@ -196,4 +200,19 @@ public class HiveModule implements Module {
     private boolean isNativeAggFunctionEnabled() {
         return config.get(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED);
     }
+
+    private Optional<FunctionDefinition> getBuiltInNativeAggFunction(String 
name) {
+        switch (name) {
+            case "sum":
+                // We override Hive's sum function by native implementation to 
supports hash-agg
+                return Optional.of(new HiveSumAggFunction());
+            case "min":
+                // We override Hive's min function by native implementation to 
supports hash-agg
+                return Optional.of(new HiveMinAggFunction());
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Built-in hive aggregate function doesn't 
support %s yet!", name));
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
index c92bf6546f1..bd124c7d554 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectAggITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.module.CoreModule;
@@ -28,6 +29,7 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -38,7 +40,6 @@ import java.util.List;
 
 import static 
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
-import static 
org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -76,19 +77,6 @@ public class HiveDialectAggITCase {
         tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED, 
true);
     }
 
-    @Test
-    public void testSumAggFunctionPlan() {
-        // test explain
-        String actualPlan = explainSql("select x, sum(y) from foo group by x");
-        
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testSumAggFunctionPlan.out"));
-
-        // test fallback to hive sum udaf
-        tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED, 
false);
-        String actualSortAggPlan = explainSql("select x, sum(y) from foo group 
by x");
-        assertThat(actualSortAggPlan)
-                
.isEqualTo(readFromResource("/explain/testSumAggFunctionFallbackPlan.out"));
-    }
-
     @Test
     public void testSimpleSumAggFunction() throws Exception {
         tableEnv.executeSql(
@@ -148,7 +136,7 @@ public class HiveDialectAggITCase {
         assertThatThrownBy(
                         () ->
                                 CollectionUtil.iteratorToList(
-                                        tableEnv.executeSql("select 
sum(ts)from test_sum")
+                                        tableEnv.executeSql("select sum(ts) 
from test_sum")
                                                 .collect()))
                 .rootCause()
                 .satisfiesAnyOf(
@@ -179,11 +167,115 @@ public class HiveDialectAggITCase {
         tableEnv.executeSql("drop table test_sum_group");
     }
 
-    private String explainSql(String sql) {
-        return (String)
-                CollectionUtil.iteratorToList(tableEnv.executeSql("explain " + 
sql).collect())
-                        .get(0)
-                        .getField(0);
+    @Test
+    public void testMinAggFunction() throws Exception {
+        tableEnv.executeSql(
+                "create table test_min(a int, b boolean, x string, y string, z 
int, d decimal(10,5), e float, f double, ts timestamp, dt date, bar binary)");
+        tableEnv.executeSql(
+                        "insert into test_min values (1, true, NULL, '2', 1, 
1.11, 1.2, 1.3, '2021-08-04 16:26:33.4','2021-08-04', 'data1'), "
+                                + "(1, false, NULL, 'b', 2, 2.22, 2.3, 2.4, 
'2021-08-06 16:26:33.4','2021-08-07', 'data2'), "
+                                + "(2, false, NULL, '4', 1, 3.33, 3.5, 3.6, 
'2021-08-08 16:26:33.4','2021-08-08', 'data3'), "
+                                + "(2, true, NULL, NULL, 4, 4.45, 4.7, 4.8, 
'2021-08-10 16:26:33.4','2021-08-01', 'data4')")
+                .await();
+
+        // test min with all elements are null
+        List<Row> result =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(x) from 
test_min").collect());
+        assertThat(result.toString()).isEqualTo("[+I[null]]");
+
+        // test min with some elements are null
+        List<Row> result2 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(y) from 
test_min").collect());
+        assertThat(result2.toString()).isEqualTo("[+I[2]]");
+
+        // test min with some elements repeated
+        List<Row> result3 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(z) from 
test_min").collect());
+        assertThat(result3.toString()).isEqualTo("[+I[1]]");
+
+        // test min with decimal type
+        List<Row> result4 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(d) from 
test_min").collect());
+        assertThat(result4.toString()).isEqualTo("[+I[1.11000]]");
+
+        // test min with float type
+        List<Row> result5 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(e) from 
test_min").collect());
+        assertThat(result5.toString()).isEqualTo("[+I[1.2]]");
+
+        // test min with double type
+        List<Row> result6 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(f) from 
test_min").collect());
+        assertThat(result6.toString()).isEqualTo("[+I[1.3]]");
+
+        // test min with boolean type
+        List<Row> result7 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(b) from 
test_min").collect());
+        assertThat(result7.toString()).isEqualTo("[+I[false]]");
+
+        // test min with timestamp type
+        List<Row> result8 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(ts) from 
test_min").collect());
+        
assertThat(result8.toString()).isEqualTo("[+I[2021-08-04T16:26:33.400]]");
+
+        // test min with date type
+        List<Row> result9 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(dt) from 
test_min").collect());
+        assertThat(result9.toString()).isEqualTo("[+I[2021-08-01]]");
+
+        // test min with binary type
+        List<Row> result10 =
+                CollectionUtil.iteratorToList(
+                        tableEnv.executeSql("select min(bar) from 
test_min").collect());
+        assertThat(result10.toString()).isEqualTo("[+I[[100, 97, 116, 97, 
49]]]");
+
+        tableEnv.executeSql("drop table test_min");
+
+        // test min with unsupported data type
+        tableEnv.executeSql(
+                "create table test_min_not_support_type(a array<int>,m 
map<int, string>,s struct<f1:int,f2:string>)");
+        // test min with row type
+        String expectedRowMessage =
+                "Hive native min aggregate function does not support type: 
'ROW' now. Please re-check the data type.";
+        assertSqlException(
+                "select min(s) from test_min_not_support_type",
+                TableException.class,
+                expectedRowMessage);
+
+        // test min with array type
+        String expectedArrayMessage =
+                "Hive native min aggregate function does not support type: 
'ARRAY' now. Please re-check the data type.";
+        assertSqlException(
+                "select min(a) from test_min_not_support_type",
+                TableException.class,
+                expectedArrayMessage);
+
+        // test min with map type, hive also does not support map type 
comparisons.
+        String expectedMapMessage =
+                "Cannot support comparison of map<> type or complex type 
containing map<>.";
+        assertSqlException(
+                "select min(m) from test_min_not_support_type",
+                UDFArgumentTypeException.class,
+                expectedMapMessage);
+
+        tableEnv.executeSql("drop table test_min_not_support_type");
+    }
+
+    private void assertSqlException(
+            String sql, Class<?> expectedExceptionClz, String expectedMessage) 
{
+        assertThatThrownBy(() -> tableEnv.executeSql(sql))
+                .rootCause()
+                .isInstanceOf(expectedExceptionClz)
+                .hasMessage(expectedMessage);
     }
 
     private static TableEnvironment getTableEnvWithHiveCatalog() {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
new file mode 100644
index 00000000000..69b8ca9179f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryPlanTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.module.CoreModule;
+import org.apache.flink.table.module.hive.HiveModule;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static 
org.apache.flink.connectors.hive.HiveOptions.TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
+import static 
org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test hive query plan. */
+public class HiveDialectQueryPlanTest {
+
+    private static HiveCatalog hiveCatalog;
+    private static TableEnvironment tableEnv;
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        hiveCatalog = HiveTestUtils.createHiveCatalog();
+        // required by query like "src.`[k].*` from src"
+        
hiveCatalog.getHiveConf().setVar(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT, 
"none");
+        hiveCatalog.open();
+        tableEnv = getTableEnvWithHiveCatalog();
+
+        // create tables
+        tableEnv.executeSql("create table foo (x int, y int)");
+
+        HiveTestUtils.createTextTableInserter(hiveCatalog, "default", "foo")
+                .addRow(new Object[] {1, 1})
+                .addRow(new Object[] {2, 2})
+                .addRow(new Object[] {3, 3})
+                .addRow(new Object[] {4, 4})
+                .addRow(new Object[] {5, 5})
+                .commit();
+    }
+
+    @Before
+    public void before() {
+        // enable native hive agg function
+        tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED, 
true);
+    }
+
+    @Test
+    public void testSumAggFunctionPlan() {
+        // test explain
+        String actualPlan = explainSql("select x, sum(y) from foo group by x");
+        
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testSumAggFunctionPlan.out"));
+
+        // test fallback to hive sum udaf
+        tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED, 
false);
+        String actualSortAggPlan = explainSql("select x, sum(y) from foo group 
by x");
+        assertThat(actualSortAggPlan)
+                
.isEqualTo(readFromResource("/explain/testSumAggFunctionFallbackPlan.out"));
+    }
+
+    @Test
+    public void testMinAggFunctionPlan() {
+        // test explain
+        String actualPlan = explainSql("select x, min(y) from foo group by x");
+        
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMinAggFunctionPlan.out"));
+
+        // test fallback to hive min udaf
+        tableEnv.getConfig().set(TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED, 
false);
+        String actualSortAggPlan = explainSql("select x, min(y) from foo group 
by x");
+        assertThat(actualSortAggPlan)
+                
.isEqualTo(readFromResource("/explain/testMinAggFunctionFallbackPlan.out"));
+    }
+
+    private String explainSql(String sql) {
+        return (String)
+                CollectionUtil.iteratorToList(tableEnv.executeSql("explain " + 
sql).collect())
+                        .get(0)
+                        .getField(0);
+    }
+
+    private static TableEnvironment getTableEnvWithHiveCatalog() {
+        TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        tableEnv.useCatalog(hiveCatalog.getName());
+        // automatically load hive module in hive-compatible mode
+        HiveModule hiveModule =
+                new HiveModule(
+                        hiveCatalog.getHiveVersion(),
+                        tableEnv.getConfig(),
+                        Thread.currentThread().getContextClassLoader());
+        CoreModule coreModule = CoreModule.INSTANCE;
+        for (String loaded : tableEnv.listModules()) {
+            tableEnv.unloadModule(loaded);
+        }
+        tableEnv.loadModule("hive", hiveModule);
+        tableEnv.loadModule("core", coreModule);
+        return tableEnv;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionFallbackPlan.out
 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionFallbackPlan.out
new file mode 100644
index 00000000000..1dc2488ba8f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionFallbackPlan.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(x=[$0], _o__c1=[$1])
++- LogicalAggregate(group=[{0}], agg#0=[min($1)])
+   +- LogicalProject($f0=[$0], $f1=[$1])
+      +- LogicalTableScan(table=[[test-catalog, default, foo]])
+
+== Optimized Physical Plan ==
+SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_min($f1) AS $f1])
++- Sort(orderBy=[x ASC])
+   +- Exchange(distribution=[hash[x]])
+      +- LocalSortAggregate(groupBy=[x], select=[x, Partial_min(y) AS $f1])
+         +- Sort(orderBy=[x ASC])
+            +- TableSourceScan(table=[[test-catalog, default, foo]], 
fields=[x, y])
+
+== Optimized Execution Plan ==
+SortAggregate(isMerge=[true], groupBy=[x], select=[x, Final_min($f1) AS $f1])
++- Sort(orderBy=[x ASC])
+   +- Exchange(distribution=[hash[x]])
+      +- LocalSortAggregate(groupBy=[x], select=[x, Partial_min(y) AS $f1])
+         +- Sort(orderBy=[x ASC])
+            +- TableSourceScan(table=[[test-catalog, default, foo]], 
fields=[x, y])
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionPlan.out
 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionPlan.out
new file mode 100644
index 00000000000..902e454afb5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testMinAggFunctionPlan.out
@@ -0,0 +1,17 @@
+== Abstract Syntax Tree ==
+LogicalProject(x=[$0], _o__c1=[$1])
++- LogicalAggregate(group=[{0}], agg#0=[min($1)])
+   +- LogicalProject($f0=[$0], $f1=[$1])
+      +- LogicalTableScan(table=[[test-catalog, default, foo]])
+
+== Optimized Physical Plan ==
+HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_min(min$0) AS $f1])
++- Exchange(distribution=[hash[x]])
+   +- LocalHashAggregate(groupBy=[x], select=[x, Partial_min(y) AS min$0])
+      +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])
+
+== Optimized Execution Plan ==
+HashAggregate(isMerge=[true], groupBy=[x], select=[x, Final_min(min$0) AS $f1])
++- Exchange(distribution=[hash[x]])
+   +- LocalHashAggregate(groupBy=[x], select=[x, Partial_min(y) AS min$0])
+      +- TableSourceScan(table=[[test-catalog, default, foo]], fields=[x, y])

Reply via email to