This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new c677334 [FLINK-16268][table-planner-blink] Failed to run rank over
window with Hive built-in functions
c677334 is described below
commit c677334727bc388fd9fd051c7aac0cf6e595d82c
Author: Rui Li <[email protected]>
AuthorDate: Tue Nov 3 15:06:49 2020 +0800
[FLINK-16268][table-planner-blink] Failed to run rank over window with Hive
built-in functions
This closes #13857
---
.../flink/table/module/hive/HiveModuleTest.java | 19 +++++++++++++++++++
.../expressions/PlannerTypeInferenceUtilImpl.java | 21 +++++++++++++++++++++
.../planner/functions/utils/HiveFunctionUtils.java | 2 +-
.../functions/utils/UserDefinedFunctionUtils.scala | 6 +++++-
4 files changed, 46 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index 343d818..be8a75c 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.functions.hive.HiveSimpleUDF;
+import org.apache.flink.table.module.CoreModule;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
@@ -185,4 +186,22 @@ public class HiveModuleTest {
results = Lists.newArrayList(tableEnv.sqlQuery("select
length('')").execute().collect());
assertEquals("[0]", results.toString());
}
+
+ @Test
+ // tests to verify we have set arguments for hive udf before trying to
get result type
+ public void testHiveUDFSetArguments() throws Exception {
+ TableEnvironment tableEnv =
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+
+ tableEnv.unloadModule("core");
+ tableEnv.loadModule("hive", new HiveModule());
+ tableEnv.loadModule("core", CoreModule.INSTANCE);
+
+ String path =
getClass().getResource("/csv/test.csv").toURI().toString();
+ tableEnv.executeSql(String.format(
+ "create table src(x int,y int) with
('connector'='filesystem','format'='csv','path'='%s')", path));
+
+ Lists.newArrayList(tableEnv.executeSql("select x from src where
y is not null limit 10").collect());
+ Lists.newArrayList(tableEnv.executeSql("select count(distinct
if(y is null, 0, y)) from src where x=-1 limit 1").collect());
+ Lists.newArrayList(tableEnv.executeSql("select x, rank() over
(partition by x order by y) from src").collect());
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
index 4640377..d813ccb 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/PlannerTypeInferenceUtilImpl.java
@@ -25,11 +25,15 @@ import
org.apache.flink.table.delegation.PlannerTypeInferenceUtil;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.functions.utils.HiveFunctionUtils;
import org.apache.flink.table.planner.typeutils.TypeCoercion;
import org.apache.flink.table.planner.validate.ValidationFailure;
import org.apache.flink.table.planner.validate.ValidationResult;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeInferenceUtil;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.ArrayList;
import java.util.List;
@@ -74,6 +78,23 @@ public final class PlannerTypeInferenceUtilImpl implements
PlannerTypeInferenceU
.map(ResolvedExpression::getOutputDataType)
.collect(Collectors.toList());
+ if (plannerCall instanceof PlannerScalarFunctionCall) {
+ ScalarFunction scalarFunction =
((PlannerScalarFunctionCall) plannerCall).scalarFunction();
+ // need to set arg types for Hive functions
+ if
(HiveFunctionUtils.isHiveFunc(scalarFunction)) {
+ LogicalType[] logicalTypes =
expectedArgumentTypes.stream()
+
.map(DataType::getLogicalType).toArray(LogicalType[]::new);
+ Object[] constArgs = new
Object[logicalTypes.length];
+ for (int i = 0; i < constArgs.length;
i++) {
+ if (resolvedArgs.get(i)
instanceof ValueLiteralExpression) {
+ ValueLiteralExpression
literalExpression = (ValueLiteralExpression) resolvedArgs.get(i);
+ constArgs[i] =
literalExpression.getValueAs(Object.class).orElse(null);
+ }
+ }
+
HiveFunctionUtils.invokeSetArgs(scalarFunction, constArgs, logicalTypes);
+ }
+ }
+
return new TypeInferenceUtil.Result(
expectedArgumentTypes,
null,
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
index 13a82cb..7939f47 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveFunctionUtils.java
@@ -51,7 +51,7 @@ public class HiveFunctionUtils {
}
- static Serializable invokeSetArgs(
+ public static Serializable invokeSetArgs(
Serializable function, Object[] constantArguments,
LogicalType[] argTypes) {
try {
// See hive HiveFunction
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index 471a39b..026ddc2 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -467,7 +467,11 @@ object UserDefinedFunctionUtils {
displayName: String,
function: ScalarFunction,
typeFactory: FlinkTypeFactory): SqlFunction = {
- new ScalarSqlFunction(identifier, displayName, function, typeFactory)
+ if (HiveFunctionUtils.isHiveFunc(function)) {
+ new HiveScalarSqlFunction(identifier, function, typeFactory)
+ } else {
+ new ScalarSqlFunction(identifier, displayName, function, typeFactory)
+ }
}
/**