This is an automated email from the ASF dual-hosted git repository.

jingzhang pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new b7e3eb94801 [FLINK-24862][hive]Fix user-defined hive udaf/udtf cannot 
be used normally in hive dialect
b7e3eb94801 is described below

commit b7e3eb948017e50b3d8f9f7ef3f94330bea71400
Author: hehuiyuan1 <hehuiy...@jd.com>
AuthorDate: Sat Mar 12 13:57:40 2022 +0800

    [FLINK-24862][hive]Fix user-defined hive udaf/udtf cannot be used normally 
in hive dialect
---
 .../flink/connectors/hive/HiveDialectITCase.java   | 30 ++++++++++++++++++++++
 .../flink/table/catalog/FunctionCatalog.java       | 16 +++++-------
 2 files changed, 37 insertions(+), 9 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index d2e57000d4e..30537c20369 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
@@ -635,6 +636,35 @@ public class HiveDialectITCase {
         tableEnv.executeSql("drop temporary function if exists foo");
     }
 
+    @Test
+    public void testTemporaryFunctionUDAF() throws Exception {
+        // create temp function
+        tableEnv.executeSql(
+                String.format(
+                        "create temporary function temp_count as '%s'",
+                        GenericUDAFCount.class.getName()));
+        String[] functions = tableEnv.listUserDefinedFunctions();
+        assertArrayEquals(new String[] {"temp_count"}, functions);
+        // call the function
+        tableEnv.executeSql("create table src(x int)");
+        tableEnv.executeSql("insert into src values (1),(-1)").await();
+        assertEquals(
+                "[+I[2]]",
+                queryResult(tableEnv.sqlQuery("select temp_count(x) from 
src")).toString());
+        // switch DB and the temp function can still be used
+        tableEnv.executeSql("create database db1");
+        tableEnv.useDatabase("db1");
+        assertEquals(
+                "[+I[2]]",
+                queryResult(tableEnv.sqlQuery("select temp_count(x) from 
`default`.src"))
+                        .toString());
+        // drop the function
+        tableEnv.executeSql("drop temporary function temp_count");
+        functions = tableEnv.listUserDefinedFunctions();
+        assertEquals(0, functions.length);
+        tableEnv.executeSql("drop temporary function if exists foo");
+    }
+
     @Test
     public void testCatalog() {
         List<Row> catalogs =
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index f62380caccb..4c1d6642576 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -631,16 +631,14 @@ public final class FunctionCatalog {
         // In this situation the UDF have not been validated and cleaned, so 
we need to validate it
         // and clean its closure here.
         // If the input is instance of `ScalarFunctionDefinition`, 
`TableFunctionDefinition` and so
-        // on,
-        // it means it uses the old type inference. We assume that they have 
been validated before
-        // being
-        // wrapped.
-        if (function instanceof InlineCatalogFunction
-                && ((InlineCatalogFunction) function).getDefinition()
-                        instanceof UserDefinedFunction) {
-
+        // on, it means it uses the old type inference. We assume that they 
have been validated
+        // before being wrapped.
+        if (function instanceof InlineCatalogFunction) {
             FunctionDefinition definition = ((InlineCatalogFunction) 
function).getDefinition();
-            UserDefinedFunctionHelper.prepareInstance(config, 
(UserDefinedFunction) definition);
+            if (definition instanceof UserDefinedFunction) {
+                UserDefinedFunctionHelper.prepareInstance(config, 
(UserDefinedFunction) definition);
+            }
+            // Skip validation if it's not a UserDefinedFunction.
         } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
             ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
             UserDefinedFunctionHelper.validateClass(

Reply via email to