This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dbf5dce6149 [FLINK-29590][hive] Fix literal issue in Hive dialect (#21026) dbf5dce6149 is described below commit dbf5dce61493cd0b7e73ae3af39146caf52c0767 Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Thu Oct 13 19:27:01 2022 +0800 [FLINK-29590][hive] Fix literal issue in Hive dialect (#21026) --- .../connector/file/table/PartitionLoader.java | 5 +- .../hive/HiveParserTypeCheckProcFactory.java | 92 +++++++++++++++++++++- .../connectors/hive/HiveDialectQueryITCase.java | 27 +++++++ 3 files changed, 121 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java index 53f98bf6761..4ff1a9c1f4d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java @@ -136,11 +136,12 @@ public class PartitionLoader implements Closeable { private void overwrite(Path destDir) throws Exception { if (overwrite) { // delete existing files for overwrite - FileStatus[] existingFiles = listStatusWithoutHidden(destDir.getFileSystem(), destDir); + FileSystem destFS = destDir.getFileSystem(); + FileStatus[] existingFiles = listStatusWithoutHidden(destFS, destDir); if (existingFiles != null) { for (FileStatus existingFile : existingFiles) { // TODO: We need move to trash when auto-purge is false. - fs.delete(existingFile.getPath(), true); + destFS.delete(existingFile.getPath(), true); } } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java index ef2d1b45fe7..df0b3d765b1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserTypeCheckProcFactory.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInternalInterval; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFNvl; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPDivide; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNegative; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot; @@ -88,6 +89,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; @@ -1234,6 +1236,46 @@ public class HiveParserTypeCheckProcFactory { desc = ExprNodeGenericFuncDesc.newInstance( new HiveGenericUDFInternalInterval(), funcText, children); + } else if (genericUDF instanceof GenericUDFOPDivide && children.size() == 2) { + // special case for GenericUDFOPDivide + // if the divisor or dividend is decimal type and the other one + // parameter is int/long literal, the TypeInfo of the ExprNodeGenericFuncDesc + // may be different with inferred result type, which will cause "Mismatch of + // expected output data type 'DECIMAL(..)' and function's output type + // 'DECIMAL(..)'" in BridgingFunctionGenUtil#verifyOutputType + + // the reason is: in here we got expected result type, which will consider + // int/long literal parameter as actual precision, + // but in the phase to infer result type phase, the + // int/long literal parameter will always be considered as max precision. 10 for + // int, and 19 for long. + // To fix it, in here, we also should consider the int/long literal parameter as + // max precision. + ExprNodeDesc exprNodeDesc1 = children.get(0); + ExprNodeDesc exprNodeDesc2 = children.get(1); + // if one parameter is decimal type, and the other is int or long + if ((isDecimalTypeInfo(exprNodeDesc1) + && exprNodeDesc2 instanceof ExprNodeConstantDesc + && isIntOrLongTypeInfo(exprNodeDesc2)) + || (isDecimalTypeInfo(exprNodeDesc2) + && exprNodeDesc1 instanceof ExprNodeConstantDesc + && isIntOrLongTypeInfo(exprNodeDesc1))) { + // find which parameter we should change + int childToChange = isIntOrLongTypeInfo(exprNodeDesc1) ? 0 : 1; + // change the int/long literal parameter to decimal type with the max + // precision of int/long literal which is consistent to infer logic. + children.set( + childToChange, + new ExprNodeConstantDesc( + HiveDecimalUtils.getDecimalTypeForPrimitiveCategory( + (PrimitiveTypeInfo) + children.get(childToChange).getTypeInfo()), + HiveDecimal.create( + ((ExprNodeConstantDesc) children.get(childToChange)) + .getValue() + .toString()))); + } + desc = ExprNodeGenericFuncDesc.newInstance(genericUDF, funcText, children); } else { desc = ExprNodeGenericFuncDesc.newInstance(genericUDF, funcText, children); } @@ -1246,7 +1288,13 @@ public class HiveParserTypeCheckProcFactory { && HiveParserExprNodeDescUtils.isAllConstants(children)) { ExprNodeDesc constantExpr = ConstantPropagateProcFactory.foldExpr((ExprNodeGenericFuncDesc) desc); - if (constantExpr != null) { + if (constantExpr != null + // if constantExpr is instanceof ExprNodeConstantDesc, we should check + // whether it can be folded to constant safely for some folded constant + // can't be converted to calcite literal currently. + && (!(constantExpr instanceof ExprNodeConstantDesc) + || canSafeFoldToConstant( + (ExprNodeConstantDesc) constantExpr))) { desc = constantExpr; } } @@ -1355,6 +1403,48 @@ public class HiveParserTypeCheckProcFactory { return false; } + private boolean isDecimalTypeInfo(ExprNodeDesc exprNodeDesc) { + TypeInfo typeInfo = exprNodeDesc.getTypeInfo(); + return typeInfo instanceof PrimitiveTypeInfo + && ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() + == PrimitiveObjectInspector.PrimitiveCategory.DECIMAL; + } + + private boolean isIntOrLongTypeInfo(ExprNodeDesc exprNodeDesc) { + TypeInfo typeInfo = exprNodeDesc.getTypeInfo(); + if (!(typeInfo instanceof PrimitiveTypeInfo)) { + return false; + } + return ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() + == PrimitiveObjectInspector.PrimitiveCategory.INT + || ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory() + == PrimitiveObjectInspector.PrimitiveCategory.LONG; + } + + private boolean canSafeFoldToConstant(ExprNodeConstantDesc constantExpr) { + // if it's not primitive type, can't be folded to constant safely + boolean isPrimitiveTyp = constantExpr.getTypeInfo() instanceof PrimitiveTypeInfo; + if (!isPrimitiveTyp) { + return false; + } + // if it's binary type, can't be folded to constant safely + boolean isBinaryConstant = + constantExpr.getTypeInfo() instanceof PrimitiveTypeInfo + && (((PrimitiveTypeInfo) constantExpr.getTypeInfo()) + .getPrimitiveCategory() + == PrimitiveObjectInspector.PrimitiveCategory.BINARY); + if (isBinaryConstant) { + return false; + } + // if it's NAN, can't be folded to constant safely + boolean isNAN = + ((constantExpr.getValue() instanceof Double + && Double.isNaN((Double) constantExpr.getValue())) + || (constantExpr.getValue() instanceof Float + && Float.isNaN((Float) constantExpr.getValue()))); + return !isNAN; + } + protected ExprNodeDesc processQualifiedColRef( HiveParserTypeCheckCtx ctx, HiveParserASTNode expr, Object... nodeOutputs) throws SemanticException { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java index 9d7a87eb3e0..869f3fcb486 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java @@ -852,6 +852,33 @@ public class HiveDialectQueryITCase { } } + @Test + public void testLiteral() throws Exception { + List<Row> result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("SELECT asin(2), binary('1'), struct(2, 9, 7)") + .collect()); + if (HiveVersionTestUtil.HIVE_310_OR_LATER) { + assertThat(result.toString()).isEqualTo("[+I[null, [49], +I[2, 9, 7]]]"); + } else { + assertThat(result.toString()).isEqualTo("[+I[NaN, [49], +I[2, 9, 7]]]"); + } + tableEnv.executeSql("create table test_decimal_literal(d decimal(10, 2))"); + try { + tableEnv.executeSql("insert into test_decimal_literal values (1.2)").await(); + result = + CollectionUtil.iteratorToList( + tableEnv.executeSql( + "select d / 3, d / 3L, 6 / d, 6L / d from test_decimal_literal") + .collect()); + assertThat(result.toString()) + .isEqualTo("[+I[0.400000, 0.400000, 5.00000000000, 5.00000000000]]"); + + } finally { + tableEnv.executeSql("drop table test_decimal_literal"); + } + } + @Test public void testCrossCatalogQueryNoHiveTable() throws Exception { // register a new in-memory catalog