This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e850398  [FLINK-17334][hive] Flink does not support HIVE UDFs with 
primitive return types
e850398 is described below

commit e8503986132f0ffaeec91caf5da6ece2d0eb70d3
Author: RoyRuan <415131...@qq.com>
AuthorDate: Wed Apr 29 10:51:45 2020 +0800

    [FLINK-17334][hive] Flink does not support HIVE UDFs with primitive return 
types
    
    
    This closes #11876
---
 .../flink/table/functions/hive/HiveSimpleUDF.java  |  5 +-
 .../functions/hive/conversion/HiveInspectors.java  | 68 ----------------------
 .../table/functions/hive/HiveSimpleUDFTest.java    | 61 +++++++++++++++++++
 3 files changed, 63 insertions(+), 71 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
index 97f9a45..e6bb663 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
@@ -126,11 +126,10 @@ public class HiveSimpleUDF extends 
HiveScalarFunction<UDF> {
                        for (DataType argType : argTypes) {
                                
argTypeInfo.add(HiveTypeUtil.toHiveTypeInfo(argType, false));
                        }
-                       Class returnType = hiveFunctionWrapper.createFunction()
-                               
.getResolver().getEvalMethod(argTypeInfo).getReturnType();
 
+                       Method evalMethod = 
hiveFunctionWrapper.createFunction().getResolver().getEvalMethod(argTypeInfo);
                        return HiveTypeUtil.toFlinkType(
-                               HiveInspectors.getObjectInspector(hiveShim, 
returnType));
+                               
ObjectInspectorFactory.getReflectionObjectInspector(evalMethod.getGenericReturnType(),
 ObjectInspectorFactory.ObjectInspectorOptions.JAVA));
                } catch (UDFArgumentException e) {
                        throw new FlinkHiveUDFException(e);
                }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index c9d4ffd..3bc313e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -36,14 +36,6 @@ import org.apache.flink.types.Row;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -89,15 +81,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 
 import javax.annotation.Nonnull;
 
@@ -348,59 +333,6 @@ public class HiveInspectors {
                        String.format("Unwrap does not support ObjectInspector 
'%s' yet", inspector));
        }
 
-       public static ObjectInspector getObjectInspector(HiveShim hiveShim, 
Class clazz) {
-               TypeInfo typeInfo;
-
-               if (clazz.equals(String.class) || clazz.equals(Text.class)) {
-
-                       typeInfo = TypeInfoFactory.stringTypeInfo;
-               } else if (clazz.equals(Boolean.class) || 
clazz.equals(BooleanWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.booleanTypeInfo;
-               } else if (clazz.equals(Byte.class) || 
clazz.equals(ByteWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.byteTypeInfo;
-               } else if (clazz.equals(Short.class) || 
clazz.equals(ShortWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.shortTypeInfo;
-               } else if (clazz.equals(Integer.class) || 
clazz.equals(IntWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.intTypeInfo;
-               } else if (clazz.equals(Long.class) || 
clazz.equals(LongWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.longTypeInfo;
-               } else if (clazz.equals(Float.class) || 
clazz.equals(FloatWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.floatTypeInfo;
-               } else if (clazz.equals(Double.class) || 
clazz.equals(DoubleWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.doubleTypeInfo;
-               } else if (clazz.equals(hiveShim.getDateDataTypeClass()) || 
clazz.equals(DateWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.dateTypeInfo;
-               } else if (clazz.equals(hiveShim.getTimestampDataTypeClass()) 
|| clazz.equals(TimestampWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.timestampTypeInfo;
-               } else if (clazz.equals(byte[].class) || 
clazz.equals(BytesWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.binaryTypeInfo;
-               } else if (clazz.equals(HiveChar.class) || 
clazz.equals(HiveCharWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.charTypeInfo;
-               } else if (clazz.equals(HiveVarchar.class) || 
clazz.equals(HiveVarcharWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.varcharTypeInfo;
-               } else if (clazz.equals(HiveDecimal.class) || 
clazz.equals(HiveDecimalWritable.class)) {
-
-                       typeInfo = TypeInfoFactory.decimalTypeInfo;
-               } else {
-                       throw new FlinkHiveUDFException(
-                               String.format("Class %s is not supported yet", 
clazz.getName()));
-               }
-
-               return getObjectInspector(typeInfo);
-       }
-
        /**
         * Get Hive {@link ObjectInspector} for a Flink {@link DataType}.
         */
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index 49cf403..c786613 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.functions.hive.util.TestHiveUDFArray;
 import org.apache.flink.table.types.DataType;
 
+import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.hive.ql.udf.UDFBase64;
 import org.apache.hadoop.hive.ql.udf.UDFBin;
 import org.apache.hadoop.hive.ql.udf.UDFConv;
@@ -50,6 +51,30 @@ public class HiveSimpleUDFTest {
        private static HiveShim hiveShim = 
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
 
        @Test
+       public void testBooleanUDF() {
+               HiveSimpleUDF udf = init(BooleanUDF.class, new DataType[]{ 
DataTypes.INT()});
+               assertTrue((boolean) udf.eval(1));
+       }
+
+       @Test
+       public void testFloatUDF() {
+               HiveSimpleUDF udf = init(FloatUDF.class, new DataType[]{ 
DataTypes.FLOAT()});
+               assertEquals(3.0f, (float) udf.eval(3.0f), 0);
+       }
+
+       @Test
+       public void testIntUDF() {
+               HiveSimpleUDF udf = init(IntUDF.class, new DataType[]{ 
DataTypes.INT()});
+               assertEquals(3, (int) udf.eval(3));
+       }
+
+       @Test
+       public void testStringUDF() {
+               HiveSimpleUDF udf = init(StringUDF.class, new DataType[]{ 
DataTypes.STRING()});
+               assertEquals("test", udf.eval("test"));
+       }
+
+       @Test
        public void testUDFRand() {
                HiveSimpleUDF udf = init(UDFRand.class, new DataType[0]);
 
@@ -230,4 +255,40 @@ public class HiveSimpleUDFTest {
 
                return udf;
        }
+
+       /**
+        * Boolean Test UDF.
+        */
+       public static class BooleanUDF extends UDF {
+               public boolean evaluate(int content) {
+                       return content == 1;
+               }
+       }
+
+       /**
+        * Float Test UDF.
+        */
+       public static class FloatUDF extends UDF {
+               public float evaluate(float content) {
+                       return content;
+               }
+       }
+
+       /**
+        * Int Test UDF.
+        */
+       public static class IntUDF extends UDF {
+               public int evaluate(int content) {
+                       return content;
+               }
+       }
+
+       /**
+        * String Test UDF.
+        */
+       public static class StringUDF extends UDF {
+               public String evaluate(String content) {
+                       return content;
+               }
+       }
 }

Reply via email to