This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2c110254b6a0b709ca41fe9c819298516dccbc5
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Sun Jul 28 20:21:20 2019 +0800

    [FLINK-13225][table-planner-blink] Fix type inference for hive udtf
---
 .../catalog/FunctionCatalogOperatorTable.java      |  21 +-
 .../functions/utils/HiveTableSqlFunction.java      | 235 +++++++++++++++++++++
 .../planner/plan/QueryOperationConverter.java      |   3 +-
 .../planner/codegen/CorrelateCodeGenerator.scala   |   8 +-
 .../table/planner/codegen/ExprCodeGenerator.scala  |   4 +-
 .../planner/functions/utils/TableSqlFunction.scala | 112 +++++-----
 .../functions/utils/UserDefinedFunctionUtils.scala |   2 +-
 .../logical/FlinkLogicalTableFunctionScan.scala    |   2 +-
 ...relateToJoinFromTemporalTableFunctionRule.scala |   4 +-
 .../schema/DeferredTypeFlinkTableFunction.scala    |   4 +-
 .../planner/plan/schema/FlinkTableFunction.scala   |   1 +
 .../plan/schema/TypedFlinkTableFunction.scala      |   2 +
 .../utils/UserDefinedFunctionTestUtils.scala       |   2 +-
 13 files changed, 336 insertions(+), 64 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index ddf8f60..bc60f27 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
@@ -27,8 +28,12 @@ import 
org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction;
+import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
+import 
org.apache.flink.table.planner.plan.schema.DeferredTypeFlinkTableFunction;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
 
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
@@ -42,6 +47,7 @@ import java.util.List;
 import java.util.Optional;
 
 import static 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.isHiveFunc;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}.
@@ -108,7 +114,20 @@ public class FunctionCatalogOperatorTable implements 
SqlOperatorTable {
                } else if (functionDefinition instanceof 
TableFunctionDefinition &&
                                category != null &&
                                category.isTableFunction()) {
-                       return convertTableFunction(name, 
(TableFunctionDefinition) functionDefinition);
+                       TableFunctionDefinition def = (TableFunctionDefinition) 
functionDefinition;
+                       if (isHiveFunc(def.getTableFunction())) {
+                               DataType returnType = 
fromLegacyInfoToDataType(new GenericTypeInfo<>(Row.class));
+                               return Optional.of(new HiveTableSqlFunction(
+                                               name,
+                                               name,
+                                               def.getTableFunction(),
+                                               returnType,
+                                               typeFactory,
+                                               new 
DeferredTypeFlinkTableFunction(def.getTableFunction(), returnType),
+                                               
HiveTableSqlFunction.operandTypeChecker(name, def.getTableFunction())));
+                       } else {
+                               return convertTableFunction(name, 
(TableFunctionDefinition) functionDefinition);
+                       }
                }
 
                return Optional.empty();
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
new file mode 100644
index 0000000..6800fb9
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.utils;
+
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.FlinkTableFunction;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.util.BitString;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple3;
+
+import static 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType;
+import static 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs;
+import static 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.buildRelDataType;
+import static 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+import static 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
+/**
+ * Hive {@link TableSqlFunction}.
+ * Override getFunction to clone function and invoke {@code 
HiveGenericUDTF#setArgumentTypesAndConstants}.
+ * Override SqlReturnTypeInference to invoke {@code 
HiveGenericUDTF#getHiveResultType} instead of
+ * {@code HiveGenericUDTF#getResultType(Object[], Class[])}.
+ *
+ * @deprecated TODO hack code, its logical should be integrated to 
TableSqlFunction
+ */
+@Deprecated
+public class HiveTableSqlFunction extends TableSqlFunction {
+
+       private final TableFunction hiveUdtf;
+       private final HiveOperandTypeChecker operandTypeChecker;
+
+       public HiveTableSqlFunction(String name, String displayName,
+                       TableFunction hiveUdtf,
+                       DataType implicitResultType,
+                       FlinkTypeFactory typeFactory,
+                       FlinkTableFunction functionImpl,
+                       HiveOperandTypeChecker operandTypeChecker) {
+               super(name, displayName, hiveUdtf, implicitResultType, 
typeFactory, functionImpl, scala.Option.apply(operandTypeChecker));
+               this.hiveUdtf = hiveUdtf;
+               this.operandTypeChecker = operandTypeChecker;
+       }
+
+       @Override
+       public TableFunction makeFunction(Object[] constantArguments, 
LogicalType[] argTypes) {
+               TableFunction clone;
+               try {
+                       clone = InstantiationUtil.clone(hiveUdtf);
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new RuntimeException(e);
+               }
+               return (TableFunction) invokeSetArgs(clone, constantArguments, 
argTypes);
+       }
+
+       @Override
+       public RelDataType getRowType(RelDataTypeFactory typeFactory, 
List<SqlNode> operandList) {
+               Preconditions.checkNotNull(operandTypeChecker.previousArgTypes);
+               FlinkTypeFactory factory = (FlinkTypeFactory) typeFactory;
+               Object[] arguments = convertArguments(
+                               
Arrays.stream(operandTypeChecker.previousArgTypes)
+                                               
.map(factory::createFieldTypeFromLogicalType)
+                                               .collect(Collectors.toList()),
+                               operandList);
+               DataType resultType = 
fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(
+                               invokeGetResultType(hiveUdtf, arguments, 
operandTypeChecker.previousArgTypes, (FlinkTypeFactory) typeFactory)));
+               Tuple3<String[], int[], LogicalType[]> fieldInfo = 
UserDefinedFunctionUtils.getFieldInfo(resultType);
+               return buildRelDataType(typeFactory, 
fromDataTypeToLogicalType(resultType), fieldInfo._1(), fieldInfo._2());
+       }
+
+       /**
+        * This method is copied from calcite, and modify it to not rely on 
Function.
+        * TODO FlinkTableFunction need implement getElementType.
+        */
+       private static Object[] convertArguments(
+                       List<RelDataType> operandTypes,
+                       List<SqlNode> operandList) {
+               List<Object> arguments = new ArrayList<>(operandList.size());
+               // Construct a list of arguments, if they are all constants.
+               for (Pair<RelDataType, SqlNode> pair
+                               : Pair.zip(operandTypes, operandList)) {
+                       try {
+                               final Object o = getValue(pair.right);
+                               final Object o2 = coerce(o, pair.left);
+                               arguments.add(o2);
+                       } catch (NonLiteralException e) {
+                               arguments.add(null);
+                       }
+               }
+               return arguments.toArray();
+       }
+
+       private static Object coerce(Object value, RelDataType type) {
+               if (value == null) {
+                       return null;
+               }
+               switch (type.getSqlTypeName()) {
+                       case CHAR:
+                               return ((NlsString) value).getValue();
+                       case BINARY:
+                               return ((BitString) value).getAsByteArray();
+                       case DECIMAL:
+                               return value;
+                       case BIGINT:
+                               return ((BigDecimal) value).longValue();
+                       case INTEGER:
+                               return ((BigDecimal) value).intValue();
+                       case SMALLINT:
+                               return ((BigDecimal) value).shortValue();
+                       case TINYINT:
+                               return ((BigDecimal) value).byteValue();
+                       case DOUBLE:
+                               return ((BigDecimal) value).doubleValue();
+                       case REAL:
+                               return ((BigDecimal) value).floatValue();
+                       case FLOAT:
+                               return ((BigDecimal) value).floatValue();
+                       case DATE:
+                               return LocalDate.ofEpochDay(((DateString) 
value).getDaysSinceEpoch());
+                       case TIME:
+                               return LocalTime.ofNanoOfDay(((TimeString) 
value).getMillisOfDay() * 1000_000);
+                       case TIMESTAMP:
+                               return 
SqlDateTimeUtils.unixTimestampToLocalDateTime(((TimestampString) 
value).getMillisSinceEpoch());
+                       default:
+                               throw new RuntimeException("Not support type: " 
+ type);
+               }
+       }
+
+       private static Object getValue(SqlNode right) throws 
NonLiteralException {
+               switch (right.getKind()) {
+                       case ARRAY_VALUE_CONSTRUCTOR:
+                               final List<Object> list = new ArrayList<>();
+                               for (SqlNode o : ((SqlCall) 
right).getOperandList()) {
+                                       list.add(getValue(o));
+                               }
+                               return 
ImmutableNullableList.copyOf(list).toArray();
+                       case MAP_VALUE_CONSTRUCTOR:
+                               final Map<Object, Object> map = new HashMap<>();
+                               final List<SqlNode> operands = ((SqlCall) 
right).getOperandList();
+                               for (int i = 0; i < operands.size(); i += 2) {
+                                       final SqlNode key = operands.get(i);
+                                       final SqlNode value = operands.get(i + 
1);
+                                       map.put(getValue(key), getValue(value));
+                               }
+                               return map;
+                       default:
+                               if (SqlUtil.isNullLiteral(right, true)) {
+                                       return null;
+                               }
+                               if (SqlUtil.isLiteral(right)) {
+                                       return ((SqlLiteral) right).getValue();
+                               }
+                               if (right.getKind() == SqlKind.DEFAULT) {
+                                       return null; // currently NULL is the 
only default value
+                               }
+                               throw new NonLiteralException();
+               }
+       }
+
+       /** Thrown when a non-literal occurs in an argument to a user-defined
+        * table macro. */
+       private static class NonLiteralException extends Exception {
+       }
+
+       public static HiveOperandTypeChecker operandTypeChecker(String name, 
TableFunction udtf) {
+               return new HiveOperandTypeChecker(name, udtf, 
UserDefinedFunctionUtils.checkAndExtractMethods(udtf, "eval"));
+       }
+
+       /**
+        * Checker for remember previousArgTypes.
+        *
+        * @deprecated TODO hack code, should modify calcite getRowType to pass 
operand types
+        */
+       @Deprecated
+       public static class HiveOperandTypeChecker extends OperandTypeChecker {
+
+               private LogicalType[] previousArgTypes;
+
+               private HiveOperandTypeChecker(String name, TableFunction udtf, 
Method[] methods) {
+                       super(name, udtf, methods);
+               }
+
+               @Override
+               public boolean checkOperandTypes(SqlCallBinding callBinding, 
boolean throwOnFailure) {
+                       previousArgTypes = 
UserDefinedFunctionUtils.getOperandTypeArray(callBinding);
+                       return super.checkOperandTypes(callBinding, 
throwOnFailure);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 2a2119d..e0f4763 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -279,7 +279,8 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                                        tableFunction,
                                        resultType,
                                        typeFactory,
-                                       function);
+                                       function,
+                                       scala.Option.empty());
 
                        List<RexNode> parameters = 
convertToRexNodes(calculatedTable.getParameters());
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index cf861bf..c292c64 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -69,15 +69,19 @@ object CorrelateCodeGenerator {
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
     // we need result Type to do code generation
     val arguments = 
UserDefinedFunctionUtils.transformRexNodes(rexCall.operands)
+    val operandTypes = rexCall.operands
+        .map(_.getType)
+        .map(FlinkTypeFactory.toLogicalType).toArray
+    val func = sqlFunction.makeFunction(arguments, operandTypes)
     val argTypes = getEvalMethodSignature(
-      sqlFunction.getTableFunction,
+      func,
       rexCall.operands
         .map(_.getType)
         .map(FlinkTypeFactory.toLogicalType).toArray)
     val udtfExternalType = sqlFunction
         .getFunction
         .asInstanceOf[FlinkTableFunction]
-        .getExternalResultType(arguments, argTypes)
+        .getExternalResultType(func, arguments, argTypes)
     val pojoFieldMapping = 
Some(UserDefinedFunctionUtils.getFieldInfo(udtfExternalType)._2)
     val inputType = FlinkTypeFactory.toLogicalRowType(inputRelType)
     val (returnType, swallowInputOnly ) = if (projectProgram.isDefined) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 7c55d73..3a05fe5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -737,7 +737,9 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
             .generate(ctx, operands, resultType)
 
       case tsf: TableSqlFunction =>
-        new TableFunctionCallGen(tsf.getTableFunction).generate(ctx, operands, 
resultType)
+        new TableFunctionCallGen(
+          tsf.makeFunction(getOperandLiterals(operands), 
operands.map(_.resultType).toArray))
+            .generate(ctx, operands, resultType)
 
       // advanced scalar functions
       case sqlOperator: SqlOperator =>
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
index 4bf554a..c3f5ac3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.planner.plan.schema.FlinkTableFunction
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.sql._
@@ -34,6 +35,7 @@ import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.validate.{SqlUserDefinedTableFunction, 
SqlUserDefinedTableMacro}
 
+import java.lang.reflect.Method
 import java.util
 
 /**
@@ -49,22 +51,26 @@ import java.util
 class TableSqlFunction(
     name: String,
     displayName: String,
-    udtf: TableFunction[_],
+    val udtf: TableFunction[_],
     implicitResultType: DataType,
     typeFactory: FlinkTypeFactory,
-    functionImpl: FlinkTableFunction)
+    functionImpl: FlinkTableFunction,
+    operandTypeInfer: Option[SqlOperandTypeChecker] = None)
   extends SqlUserDefinedTableFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
     ReturnTypes.CURSOR,
+    // type inference has the UNKNOWN operand types.
     createOperandTypeInference(name, udtf, typeFactory),
-    createOperandTypeChecker(name, udtf),
+    // only checker has the real operand types.
+    operandTypeInfer.getOrElse(createOperandTypeChecker(name, udtf)),
     null,
     functionImpl) {
 
   /**
     * Get the user-defined table function.
     */
-  def getTableFunction = udtf
+  def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): 
TableFunction[_] =
+    udtf
 
   /**
     * Get the type information of the table returned by the table function.
@@ -131,61 +137,61 @@ object TableSqlFunction {
   private[flink] def createOperandTypeChecker(
       name: String,
       udtf: TableFunction[_]): SqlOperandTypeChecker = {
+    new OperandTypeChecker(name, udtf, checkAndExtractMethods(udtf, "eval"))
+  }
+}
 
-    val methods = checkAndExtractMethods(udtf, "eval")
+/**
+  * Operand type checker based on [[TableFunction]] given information.
+  */
+class OperandTypeChecker(
+    name: String, udtf: TableFunction[_], methods: Array[Method]) extends 
SqlOperandTypeChecker {
 
-    /**
-      * Operand type checker based on [[TableFunction]] given information.
-      */
-    new SqlOperandTypeChecker {
-      override def getAllowedSignatures(op: SqlOperator, opName: String): 
String = {
-        s"$opName[${signaturesToString(udtf, "eval")}]"
-      }
+  override def getAllowedSignatures(op: SqlOperator, opName: String): String = 
{
+    s"$opName[${signaturesToString(udtf, "eval")}]"
+  }
 
-      override def getOperandCountRange: SqlOperandCountRange = {
-        var min = 254
-        var max = -1
-        var isVarargs = false
-        methods.foreach(m => {
-          var len = m.getParameterTypes.length
-          if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 
1).isArray) {
-            isVarargs = true
-            len = len - 1
-          }
-          max = Math.max(len, max)
-          min = Math.min(len, min)
-        })
-        if (isVarargs) {
-          // if eval method is varargs, set max to -1 to skip length check in 
Calcite
-          max = -1
-        }
-        SqlOperandCountRanges.between(min, max)
+  override def getOperandCountRange: SqlOperandCountRange = {
+    var min = 254
+    var max = -1
+    var isVarargs = false
+    methods.foreach(m => {
+      var len = m.getParameterTypes.length
+      if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) {
+        isVarargs = true
+        len = len - 1
       }
+      max = Math.max(len, max)
+      min = Math.min(len, min)
+    })
+    if (isVarargs) {
+      // if eval method is varargs, set max to -1 to skip length check in 
Calcite
+      max = -1
+    }
+    SqlOperandCountRanges.between(min, max)
+  }
 
-      override def checkOperandTypes(
-          callBinding: SqlCallBinding,
-          throwOnFailure: Boolean)
-        : Boolean = {
-        val operandTypes = getOperandType(callBinding)
-
-        if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) {
-          if (throwOnFailure) {
-            throw new ValidationException(
-              s"Given parameters of function '$name' do not match any 
signature. \n" +
-                  s"Actual: ${signatureInternalToString(operandTypes)} \n" +
-                  s"Expected: ${signaturesToString(udtf, "eval")}")
-          } else {
-            false
-          }
-        } else {
-          true
-        }
+  override def checkOperandTypes(
+      callBinding: SqlCallBinding,
+      throwOnFailure: Boolean)
+  : Boolean = {
+    val operandTypes = getOperandType(callBinding)
+
+    if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) {
+      if (throwOnFailure) {
+        throw new ValidationException(
+          s"Given parameters of function '$name' do not match any signature. 
\n" +
+              s"Actual: ${signatureInternalToString(operandTypes)} \n" +
+              s"Expected: ${signaturesToString(udtf, "eval")}")
+      } else {
+        false
       }
-
-      override def isOptional(i: Int): Boolean = false
-
-      override def getConsistency: Consistency = Consistency.NONE
-
+    } else {
+      true
     }
   }
+
+  override def isOptional(i: Int): Boolean = false
+
+  override def getConsistency: Consistency = Consistency.NONE
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index e565a6e..3552a7f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -802,7 +802,7 @@ object UserDefinedFunctionUtils {
     }.toArray
   }
 
-  private[table] def buildRelDataType(
+  def buildRelDataType(
       typeFactory: RelDataTypeFactory,
       resultType: LogicalType,
       fieldNames: Array[String],
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index e2ab608..38f4b03 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -100,7 +100,7 @@ class FlinkLogicalTableFunctionScanConverter
       return false
     }
     val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]
+    tableFunction.udtf.isInstanceOf[TemporalTableFunction]
   }
 
   def convert(rel: RelNode): RelNode = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index a240b64..62a4872 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -182,11 +182,11 @@ class GetTemporalTableFunctionCall(
     }
     val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 
-    if (!tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]) {
+    if (!tableFunction.udtf.isInstanceOf[TemporalTableFunction]) {
       return null
     }
     val temporalTableFunction =
-      tableFunction.getTableFunction.asInstanceOf[TemporalTableFunctionImpl]
+      tableFunction.udtf.asInstanceOf[TemporalTableFunctionImpl]
 
     checkState(
       rexCall.getOperands.size().equals(1),
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
index 9bf8221..6f467d4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.table.functions
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
 import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
@@ -39,6 +40,7 @@ class DeferredTypeFlinkTableFunction(
   extends FlinkTableFunction(tableFunction) {
 
   override def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType = {
     // TODO
@@ -56,7 +58,7 @@ class DeferredTypeFlinkTableFunction(
       typeFactory: RelDataTypeFactory,
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): RelDataType = {
-    val resultType = getExternalResultType(arguments, argTypes)
+    val resultType = getExternalResultType(tableFunction, arguments, argTypes)
     val (fieldNames, fieldIndexes, _) = 
UserDefinedFunctionUtils.getFieldInfo(resultType)
     UserDefinedFunctionUtils.buildRelDataType(
       typeFactory, fromDataTypeToLogicalType(resultType), fieldNames, 
fieldIndexes)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
index 73f5e63..6ecb8e7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
@@ -51,6 +51,7 @@ abstract class FlinkTableFunction(
     * Returns the Type for usage, i.e. code generation.
     */
   def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
index 773af90..828a4b6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.table.functions
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
@@ -41,6 +42,7 @@ class TypedFlinkTableFunction(
   extends FlinkTableFunction(tableFunction) {
 
   override def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType =
     externalResultType
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
index 432f0a2..bd69126 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -334,7 +334,7 @@ object UserDefinedFunctionTestUtils {
       }
       a + b
     }
-    
+
     def eval(a: Long, b: Int): Long = {
       eval(a, b.asInstanceOf[Long])
     }

Reply via email to