This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b2c110254b6a0b709ca41fe9c819298516dccbc5 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Sun Jul 28 20:21:20 2019 +0800 [FLINK-13225][table-planner-blink] Fix type inference for hive udtf --- .../catalog/FunctionCatalogOperatorTable.java | 21 +- .../functions/utils/HiveTableSqlFunction.java | 235 +++++++++++++++++++++ .../planner/plan/QueryOperationConverter.java | 3 +- .../planner/codegen/CorrelateCodeGenerator.scala | 8 +- .../table/planner/codegen/ExprCodeGenerator.scala | 4 +- .../planner/functions/utils/TableSqlFunction.scala | 112 +++++----- .../functions/utils/UserDefinedFunctionUtils.scala | 2 +- .../logical/FlinkLogicalTableFunctionScan.scala | 2 +- ...relateToJoinFromTemporalTableFunctionRule.scala | 4 +- .../schema/DeferredTypeFlinkTableFunction.scala | 4 +- .../planner/plan/schema/FlinkTableFunction.scala | 1 + .../plan/schema/TypedFlinkTableFunction.scala | 2 + .../utils/UserDefinedFunctionTestUtils.scala | 2 +- 13 files changed, 336 insertions(+), 64 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java index ddf8f60..bc60f27 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.catalog; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionLookup; import org.apache.flink.table.functions.AggregateFunctionDefinition; @@ -27,8 +28,12 @@ import org.apache.flink.table.functions.ScalarFunctionDefinition; import org.apache.flink.table.functions.TableFunctionDefinition; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction; +import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction; import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils; +import org.apache.flink.table.planner.plan.schema.DeferredTypeFlinkTableFunction; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; @@ -42,6 +47,7 @@ import java.util.List; import java.util.Optional; import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.isHiveFunc; +import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; /** * Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}. @@ -108,7 +114,20 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable { } else if (functionDefinition instanceof TableFunctionDefinition && category != null && category.isTableFunction()) { - return convertTableFunction(name, (TableFunctionDefinition) functionDefinition); + TableFunctionDefinition def = (TableFunctionDefinition) functionDefinition; + if (isHiveFunc(def.getTableFunction())) { + DataType returnType = fromLegacyInfoToDataType(new GenericTypeInfo<>(Row.class)); + return Optional.of(new HiveTableSqlFunction( + name, + name, + def.getTableFunction(), + returnType, + typeFactory, + new DeferredTypeFlinkTableFunction(def.getTableFunction(), returnType), + HiveTableSqlFunction.operandTypeChecker(name, def.getTableFunction()))); + } else { + return convertTableFunction(name, (TableFunctionDefinition) functionDefinition); + } } return Optional.empty(); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java new file mode 100644 index 0000000..6800fb9 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.utils; + +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.FlinkTableFunction; +import org.apache.flink.table.runtime.functions.SqlDateTimeUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.util.BitString; +import org.apache.calcite.util.DateString; +import org.apache.calcite.util.ImmutableNullableList; +import org.apache.calcite.util.NlsString; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.TimeString; +import org.apache.calcite.util.TimestampString; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.Tuple3; + +import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType; +import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs; +import static org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.buildRelDataType; +import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType; +import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType; + +/** + * Hive {@link TableSqlFunction}. + * Override getFunction to clone function and invoke {@code HiveGenericUDTF#setArgumentTypesAndConstants}. + * Override SqlReturnTypeInference to invoke {@code HiveGenericUDTF#getHiveResultType} instead of + * {@code HiveGenericUDTF#getResultType(Object[], Class[])}. + * + * @deprecated TODO hack code, its logical should be integrated to TableSqlFunction + */ +@Deprecated +public class HiveTableSqlFunction extends TableSqlFunction { + + private final TableFunction hiveUdtf; + private final HiveOperandTypeChecker operandTypeChecker; + + public HiveTableSqlFunction(String name, String displayName, + TableFunction hiveUdtf, + DataType implicitResultType, + FlinkTypeFactory typeFactory, + FlinkTableFunction functionImpl, + HiveOperandTypeChecker operandTypeChecker) { + super(name, displayName, hiveUdtf, implicitResultType, typeFactory, functionImpl, scala.Option.apply(operandTypeChecker)); + this.hiveUdtf = hiveUdtf; + this.operandTypeChecker = operandTypeChecker; + } + + @Override + public TableFunction makeFunction(Object[] constantArguments, LogicalType[] argTypes) { + TableFunction clone; + try { + clone = InstantiationUtil.clone(hiveUdtf); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + return (TableFunction) invokeSetArgs(clone, constantArguments, argTypes); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory, List<SqlNode> operandList) { + Preconditions.checkNotNull(operandTypeChecker.previousArgTypes); + FlinkTypeFactory factory = (FlinkTypeFactory) typeFactory; + Object[] arguments = convertArguments( + Arrays.stream(operandTypeChecker.previousArgTypes) + .map(factory::createFieldTypeFromLogicalType) + .collect(Collectors.toList()), + operandList); + DataType resultType = fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType( + invokeGetResultType(hiveUdtf, arguments, operandTypeChecker.previousArgTypes, (FlinkTypeFactory) typeFactory))); + Tuple3<String[], int[], LogicalType[]> fieldInfo = UserDefinedFunctionUtils.getFieldInfo(resultType); + return buildRelDataType(typeFactory, fromDataTypeToLogicalType(resultType), fieldInfo._1(), fieldInfo._2()); + } + + /** + * This method is copied from calcite, and modify it to not rely on Function. + * TODO FlinkTableFunction need implement getElementType. + */ + private static Object[] convertArguments( + List<RelDataType> operandTypes, + List<SqlNode> operandList) { + List<Object> arguments = new ArrayList<>(operandList.size()); + // Construct a list of arguments, if they are all constants. + for (Pair<RelDataType, SqlNode> pair + : Pair.zip(operandTypes, operandList)) { + try { + final Object o = getValue(pair.right); + final Object o2 = coerce(o, pair.left); + arguments.add(o2); + } catch (NonLiteralException e) { + arguments.add(null); + } + } + return arguments.toArray(); + } + + private static Object coerce(Object value, RelDataType type) { + if (value == null) { + return null; + } + switch (type.getSqlTypeName()) { + case CHAR: + return ((NlsString) value).getValue(); + case BINARY: + return ((BitString) value).getAsByteArray(); + case DECIMAL: + return value; + case BIGINT: + return ((BigDecimal) value).longValue(); + case INTEGER: + return ((BigDecimal) value).intValue(); + case SMALLINT: + return ((BigDecimal) value).shortValue(); + case TINYINT: + return ((BigDecimal) value).byteValue(); + case DOUBLE: + return ((BigDecimal) value).doubleValue(); + case REAL: + return ((BigDecimal) value).floatValue(); + case FLOAT: + return ((BigDecimal) value).floatValue(); + case DATE: + return LocalDate.ofEpochDay(((DateString) value).getDaysSinceEpoch()); + case TIME: + return LocalTime.ofNanoOfDay(((TimeString) value).getMillisOfDay() * 1000_000); + case TIMESTAMP: + return SqlDateTimeUtils.unixTimestampToLocalDateTime(((TimestampString) value).getMillisSinceEpoch()); + default: + throw new RuntimeException("Not support type: " + type); + } + } + + private static Object getValue(SqlNode right) throws NonLiteralException { + switch (right.getKind()) { + case ARRAY_VALUE_CONSTRUCTOR: + final List<Object> list = new ArrayList<>(); + for (SqlNode o : ((SqlCall) right).getOperandList()) { + list.add(getValue(o)); + } + return ImmutableNullableList.copyOf(list).toArray(); + case MAP_VALUE_CONSTRUCTOR: + final Map<Object, Object> map = new HashMap<>(); + final List<SqlNode> operands = ((SqlCall) right).getOperandList(); + for (int i = 0; i < operands.size(); i += 2) { + final SqlNode key = operands.get(i); + final SqlNode value = operands.get(i + 1); + map.put(getValue(key), getValue(value)); + } + return map; + default: + if (SqlUtil.isNullLiteral(right, true)) { + return null; + } + if (SqlUtil.isLiteral(right)) { + return ((SqlLiteral) right).getValue(); + } + if (right.getKind() == SqlKind.DEFAULT) { + return null; // currently NULL is the only default value + } + throw new NonLiteralException(); + } + } + + /** Thrown when a non-literal occurs in an argument to a user-defined + * table macro. */ + private static class NonLiteralException extends Exception { + } + + public static HiveOperandTypeChecker operandTypeChecker(String name, TableFunction udtf) { + return new HiveOperandTypeChecker(name, udtf, UserDefinedFunctionUtils.checkAndExtractMethods(udtf, "eval")); + } + + /** + * Checker for remember previousArgTypes. + * + * @deprecated TODO hack code, should modify calcite getRowType to pass operand types + */ + @Deprecated + public static class HiveOperandTypeChecker extends OperandTypeChecker { + + private LogicalType[] previousArgTypes; + + private HiveOperandTypeChecker(String name, TableFunction udtf, Method[] methods) { + super(name, udtf, methods); + } + + @Override + public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) { + previousArgTypes = UserDefinedFunctionUtils.getOperandTypeArray(callBinding); + return super.checkOperandTypes(callBinding, throwOnFailure); + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java index 2a2119d..e0f4763 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java @@ -279,7 +279,8 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod tableFunction, resultType, typeFactory, - function); + function, + scala.Option.empty()); List<RexNode> parameters = convertToRexNodes(calculatedTable.getParameters()); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala index cf861bf..c292c64 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala @@ -69,15 +69,19 @@ object CorrelateCodeGenerator { val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] // we need result Type to do code generation val arguments = UserDefinedFunctionUtils.transformRexNodes(rexCall.operands) + val operandTypes = rexCall.operands + .map(_.getType) + .map(FlinkTypeFactory.toLogicalType).toArray + val func = sqlFunction.makeFunction(arguments, operandTypes) val argTypes = getEvalMethodSignature( - sqlFunction.getTableFunction, + func, rexCall.operands .map(_.getType) .map(FlinkTypeFactory.toLogicalType).toArray) val udtfExternalType = sqlFunction .getFunction .asInstanceOf[FlinkTableFunction] - .getExternalResultType(arguments, argTypes) + .getExternalResultType(func, arguments, argTypes) val pojoFieldMapping = Some(UserDefinedFunctionUtils.getFieldInfo(udtfExternalType)._2) val inputType = FlinkTypeFactory.toLogicalRowType(inputRelType) val (returnType, swallowInputOnly ) = if (projectProgram.isDefined) { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala index 7c55d73..3a05fe5 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala @@ -737,7 +737,9 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean) .generate(ctx, operands, resultType) case tsf: TableSqlFunction => - new TableFunctionCallGen(tsf.getTableFunction).generate(ctx, operands, resultType) + new TableFunctionCallGen( + tsf.makeFunction(getOperandLiterals(operands), operands.map(_.resultType).toArray)) + .generate(ctx, operands, resultType) // advanced scalar functions case sqlOperator: SqlOperator => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala index 4bf554a..c3f5ac3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.planner.plan.schema.FlinkTableFunction import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType import org.apache.flink.table.types.DataType +import org.apache.flink.table.types.logical.LogicalType import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.sql._ @@ -34,6 +35,7 @@ import org.apache.calcite.sql.`type`._ import org.apache.calcite.sql.parser.SqlParserPos import org.apache.calcite.sql.validate.{SqlUserDefinedTableFunction, SqlUserDefinedTableMacro} +import java.lang.reflect.Method import java.util /** @@ -49,22 +51,26 @@ import java.util class TableSqlFunction( name: String, displayName: String, - udtf: TableFunction[_], + val udtf: TableFunction[_], implicitResultType: DataType, typeFactory: FlinkTypeFactory, - functionImpl: FlinkTableFunction) + functionImpl: FlinkTableFunction, + operandTypeInfer: Option[SqlOperandTypeChecker] = None) extends SqlUserDefinedTableFunction( new SqlIdentifier(name, SqlParserPos.ZERO), ReturnTypes.CURSOR, + // type inference has the UNKNOWN operand types. createOperandTypeInference(name, udtf, typeFactory), - createOperandTypeChecker(name, udtf), + // only checker has the real operand types. + operandTypeInfer.getOrElse(createOperandTypeChecker(name, udtf)), null, functionImpl) { /** * Get the user-defined table function. */ - def getTableFunction = udtf + def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): TableFunction[_] = + udtf /** * Get the type information of the table returned by the table function. @@ -131,61 +137,61 @@ object TableSqlFunction { private[flink] def createOperandTypeChecker( name: String, udtf: TableFunction[_]): SqlOperandTypeChecker = { + new OperandTypeChecker(name, udtf, checkAndExtractMethods(udtf, "eval")) + } +} - val methods = checkAndExtractMethods(udtf, "eval") +/** + * Operand type checker based on [[TableFunction]] given information. + */ +class OperandTypeChecker( + name: String, udtf: TableFunction[_], methods: Array[Method]) extends SqlOperandTypeChecker { - /** - * Operand type checker based on [[TableFunction]] given information. - */ - new SqlOperandTypeChecker { - override def getAllowedSignatures(op: SqlOperator, opName: String): String = { - s"$opName[${signaturesToString(udtf, "eval")}]" - } + override def getAllowedSignatures(op: SqlOperator, opName: String): String = { + s"$opName[${signaturesToString(udtf, "eval")}]" + } - override def getOperandCountRange: SqlOperandCountRange = { - var min = 254 - var max = -1 - var isVarargs = false - methods.foreach(m => { - var len = m.getParameterTypes.length - if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) { - isVarargs = true - len = len - 1 - } - max = Math.max(len, max) - min = Math.min(len, min) - }) - if (isVarargs) { - // if eval method is varargs, set max to -1 to skip length check in Calcite - max = -1 - } - SqlOperandCountRanges.between(min, max) + override def getOperandCountRange: SqlOperandCountRange = { + var min = 254 + var max = -1 + var isVarargs = false + methods.foreach(m => { + var len = m.getParameterTypes.length + if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) { + isVarargs = true + len = len - 1 } + max = Math.max(len, max) + min = Math.min(len, min) + }) + if (isVarargs) { + // if eval method is varargs, set max to -1 to skip length check in Calcite + max = -1 + } + SqlOperandCountRanges.between(min, max) + } - override def checkOperandTypes( - callBinding: SqlCallBinding, - throwOnFailure: Boolean) - : Boolean = { - val operandTypes = getOperandType(callBinding) - - if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) { - if (throwOnFailure) { - throw new ValidationException( - s"Given parameters of function '$name' do not match any signature. \n" + - s"Actual: ${signatureInternalToString(operandTypes)} \n" + - s"Expected: ${signaturesToString(udtf, "eval")}") - } else { - false - } - } else { - true - } + override def checkOperandTypes( + callBinding: SqlCallBinding, + throwOnFailure: Boolean) + : Boolean = { + val operandTypes = getOperandType(callBinding) + + if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) { + if (throwOnFailure) { + throw new ValidationException( + s"Given parameters of function '$name' do not match any signature. \n" + + s"Actual: ${signatureInternalToString(operandTypes)} \n" + + s"Expected: ${signaturesToString(udtf, "eval")}") + } else { + false } - - override def isOptional(i: Int): Boolean = false - - override def getConsistency: Consistency = Consistency.NONE - + } else { + true } } + + override def isOptional(i: Int): Boolean = false + + override def getConsistency: Consistency = Consistency.NONE } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala index e565a6e..3552a7f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala @@ -802,7 +802,7 @@ object UserDefinedFunctionUtils { }.toArray } - private[table] def buildRelDataType( + def buildRelDataType( typeFactory: RelDataTypeFactory, resultType: LogicalType, fieldNames: Array[String], diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala index e2ab608..38f4b03 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala @@ -100,7 +100,7 @@ class FlinkLogicalTableFunctionScanConverter return false } val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction] + tableFunction.udtf.isInstanceOf[TemporalTableFunction] } def convert(rel: RelNode): RelNode = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala index a240b64..62a4872 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala @@ -182,11 +182,11 @@ class GetTemporalTableFunctionCall( } val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] - if (!tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]) { + if (!tableFunction.udtf.isInstanceOf[TemporalTableFunction]) { return null } val temporalTableFunction = - tableFunction.getTableFunction.asInstanceOf[TemporalTableFunctionImpl] + tableFunction.udtf.asInstanceOf[TemporalTableFunctionImpl] checkState( rexCall.getOperands.size().equals(1), diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala index 9bf8221..6f467d4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.schema +import org.apache.flink.table.functions import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType @@ -39,6 +40,7 @@ class DeferredTypeFlinkTableFunction( extends FlinkTableFunction(tableFunction) { override def getExternalResultType( + tableFunction: functions.TableFunction[_], arguments: Array[AnyRef], argTypes: Array[Class[_]]): DataType = { // TODO @@ -56,7 +58,7 @@ class DeferredTypeFlinkTableFunction( typeFactory: RelDataTypeFactory, arguments: Array[AnyRef], argTypes: Array[Class[_]]): RelDataType = { - val resultType = getExternalResultType(arguments, argTypes) + val resultType = getExternalResultType(tableFunction, arguments, argTypes) val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) UserDefinedFunctionUtils.buildRelDataType( typeFactory, fromDataTypeToLogicalType(resultType), fieldNames, fieldIndexes) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala index 73f5e63..6ecb8e7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala @@ -51,6 +51,7 @@ abstract class FlinkTableFunction( * Returns the Type for usage, i.e. code generation. */ def getExternalResultType( + tableFunction: functions.TableFunction[_], arguments: Array[AnyRef], argTypes: Array[Class[_]]): DataType diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala index 773af90..828a4b6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.schema +import org.apache.flink.table.functions import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo @@ -41,6 +42,7 @@ class TypedFlinkTableFunction( extends FlinkTableFunction(tableFunction) { override def getExternalResultType( + tableFunction: functions.TableFunction[_], arguments: Array[AnyRef], argTypes: Array[Class[_]]): DataType = externalResultType diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala index 432f0a2..bd69126 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala @@ -334,7 +334,7 @@ object UserDefinedFunctionTestUtils { } a + b } - + def eval(a: Long, b: Int): Long = { eval(a, b.asInstanceOf[Long]) }