This is an automated email from the ASF dual-hosted git repository. jingzhang pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new b7e3eb94801 [FLINK-24862][hive]Fix user-defined hive udaf/udtf cannot be used normally in hive dialect b7e3eb94801 is described below commit b7e3eb948017e50b3d8f9f7ef3f94330bea71400 Author: hehuiyuan1 <hehuiy...@jd.com> AuthorDate: Sat Mar 12 13:57:40 2022 +0800 [FLINK-24862][hive]Fix user-defined hive udaf/udtf cannot be used normally in hive dialect --- .../flink/connectors/hive/HiveDialectITCase.java | 30 ++++++++++++++++++++++ .../flink/table/catalog/FunctionCatalog.java | 16 +++++------- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index d2e57000d4e..30537c20369 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; @@ -635,6 +636,35 @@ public class HiveDialectITCase { tableEnv.executeSql("drop temporary function if exists foo"); } + @Test + public void testTemporaryFunctionUDAF() throws Exception { + // create temp function + tableEnv.executeSql( + String.format( + "create temporary function temp_count as '%s'", + GenericUDAFCount.class.getName())); + String[] functions = tableEnv.listUserDefinedFunctions(); + assertArrayEquals(new String[] {"temp_count"}, functions); + // call the function + tableEnv.executeSql("create table src(x int)"); + tableEnv.executeSql("insert into src values (1),(-1)").await(); + assertEquals( + "[+I[2]]", + queryResult(tableEnv.sqlQuery("select temp_count(x) from src")).toString()); + // switch DB and the temp function can still be used + tableEnv.executeSql("create database db1"); + tableEnv.useDatabase("db1"); + assertEquals( + "[+I[2]]", + queryResult(tableEnv.sqlQuery("select temp_count(x) from `default`.src")) + .toString()); + // drop the function + tableEnv.executeSql("drop temporary function temp_count"); + functions = tableEnv.listUserDefinedFunctions(); + assertEquals(0, functions.length); + tableEnv.executeSql("drop temporary function if exists foo"); + } + @Test public void testCatalog() { List<Row> catalogs = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index f62380caccb..4c1d6642576 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -631,16 +631,14 @@ public final class FunctionCatalog { // In this situation the UDF have not been validated and cleaned, so we need to validate it // and clean its closure here. // If the input is instance of `ScalarFunctionDefinition`, `TableFunctionDefinition` and so - // on, - // it means it uses the old type inference. We assume that they have been validated before - // being - // wrapped. - if (function instanceof InlineCatalogFunction - && ((InlineCatalogFunction) function).getDefinition() - instanceof UserDefinedFunction) { - + // on, it means it uses the old type inference. We assume that they have been validated + // before being wrapped. + if (function instanceof InlineCatalogFunction) { FunctionDefinition definition = ((InlineCatalogFunction) function).getDefinition(); - UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition); + if (definition instanceof UserDefinedFunction) { + UserDefinedFunctionHelper.prepareInstance(config, (UserDefinedFunction) definition); + } + // Skip validation if it's not a UserDefinedFunction. } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) { ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); UserDefinedFunctionHelper.validateClass(