[FLINK-7439] [table] Support variable arguments for UDTF in SQL

This closes #4536.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79c17afa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79c17afa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79c17afa

Branch: refs/heads/master
Commit: 79c17afa13cd3f6cbaa91d56e036530f53e7b54b
Parents: 142bde0
Author: Jark Wu <j...@apache.org>
Authored: Mon Aug 14 14:18:52 2017 +0800
Committer: twalthr <twal...@apache.org>
Committed: Tue Sep 26 10:45:07 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/udfs.md                          |   6 +-
 .../flink/table/api/TableEnvironment.scala      |   4 +-
 .../codegen/calls/ScalarFunctionCallGen.scala   |  25 +--
 .../codegen/calls/TableFunctionCallGen.scala    |  25 +--
 .../table/functions/UserDefinedFunction.scala   |   2 +-
 .../functions/utils/TableSqlFunction.scala      | 153 ++++++++++++-------
 .../utils/UserDefinedFunctionUtils.scala        |  14 +-
 .../flink/table/plan/logical/operators.scala    |   4 +-
 .../plan/rules/logical/LogicalUnnestRule.scala  |   4 +-
 .../plan/schema/FlinkTableFunctionImpl.scala    |  15 +-
 .../flink/table/validate/FunctionCatalog.scala  |  16 --
 .../utils/JavaUserDefinedTableFunctions.java    |  17 +++
 .../table/api/batch/sql/CorrelateTest.scala     |  49 ++++++
 .../table/api/stream/sql/CorrelateTest.scala    |  49 ++++++
 .../validation/CorrelateValidationTest.scala    |   2 +-
 15 files changed, 273 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/docs/dev/table/udfs.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index eef7db6..5b34fe6 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -44,7 +44,7 @@ Scalar Functions
 
 If a required scalar function is not contained in the built-in functions, it 
is possible to define custom, user-defined scalar functions for both the Table 
API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar 
values to a new scalar value.
 
-In order to define a scalar function one has to extend the base class 
`ScalarFunction` in `org.apache.flink.table.functions` and implement (one or 
more) evaluation methods. The behavior of a scalar function is determined by 
the evaluation method. An evaluation method must be declared publicly and named 
`eval`. The parameter types and return type of the evaluation method also 
determine the parameter and return types of the scalar function. Evaluation 
methods can also be overloaded by implementing multiple methods named `eval`.
+In order to define a scalar function one has to extend the base class 
`ScalarFunction` in `org.apache.flink.table.functions` and implement (one or 
more) evaluation methods. The behavior of a scalar function is determined by 
the evaluation method. An evaluation method must be declared publicly and named 
`eval`. The parameter types and return type of the evaluation method also 
determine the parameter and return types of the scalar function. Evaluation 
methods can also be overloaded by implementing multiple methods named `eval`. 
Evaluation methods can also support variable arguments, such as `eval(String... 
strs)`.
 
 The following example shows how to define your own hash code function, 
register it in the TableEnvironment, and call it in a query. Note that you can 
configure your scalar function via a constructor before it is registered:
 
@@ -139,7 +139,7 @@ Table Functions
 
 Similar to a user-defined scalar function, a user-defined table function takes 
zero, one, or multiple scalar values as input parameters. However in contrast 
to a scalar function, it can return an arbitrary number of rows as output 
instead of a single value. The returned rows may consist of one or more 
columns. 
 
-In order to define a table function one has to extend the base class 
`TableFunction` in `org.apache.flink.table.functions` and implement (one or 
more) evaluation methods. The behavior of a table function is determined by its 
evaluation methods. An evaluation method must be declared `public` and named 
`eval`. The `TableFunction` can be overloaded by implementing multiple methods 
named `eval`. The parameter types of the evaluation methods determine all valid 
parameters of the table function. The type of the returned table is determined 
by the generic type of `TableFunction`. Evaluation methods emit output rows 
using the protected `collect(T)` method.
+In order to define a table function one has to extend the base class 
`TableFunction` in `org.apache.flink.table.functions` and implement (one or 
more) evaluation methods. The behavior of a table function is determined by its 
evaluation methods. An evaluation method must be declared `public` and named 
`eval`. The `TableFunction` can be overloaded by implementing multiple methods 
named `eval`. The parameter types of the evaluation methods determine all valid 
parameters of the table function. Evaluation methods can also support variable 
arguments, such as `eval(String... strs)`. The type of the returned table is 
determined by the generic type of `TableFunction`. Evaluation methods emit 
output rows using the protected `collect(T)` method.
 
 In the Table API, a table function is used with `.join(Expression)` or 
`.leftOuterJoin(Expression)` for Scala users and `.join(String)` or 
`.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each 
row from the outer table (table on the left of the operator) with all rows 
produced by the table-valued function (which is on the right side of the 
operator). The `leftOuterJoin` operator joins each row from the outer table 
(table on the left of the operator) with all rows produced by the table-valued 
function (which is on the right side of the operator) and preserves outer rows 
for which the table function returns an empty table. In SQL use `LATERAL 
TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join 
condition (see examples below).
 
@@ -297,7 +297,7 @@ optionally implemented. While some of these methods allow 
the system more effici
 - `merge()` is required for many batch aggreagtions and session window 
aggregations.
 - `resetAccumulator()` is required for many batch aggregations.
 
-All methods of `AggregateFunction` must be declared as `public`, not `static` 
and named exactly as the names mentioned above. The methods 
`createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are 
defined in the `AggregateFunction` abstract class, while others are contracted 
methods. In order to define a table function, one has to extend the base class 
`org.apache.flink.table.functions.AggregateFunction` and implement one (or 
more) `accumulate` methods. 
+All methods of `AggregateFunction` must be declared as `public`, not `static` 
and named exactly as the names mentioned above. The methods 
`createAccumulator`, `getValue`, `getResultType`, and `getAccumulatorType` are 
defined in the `AggregateFunction` abstract class, while others are contracted 
methods. In order to define a table function, one has to extend the base class 
`org.apache.flink.table.functions.AggregateFunction` and implement one (or 
more) `accumulate` methods. The method `accumulate` can be overloaded with 
different custom types and arguments and also support variable arguments.
 
 Detailed documentation for all methods of `AggregateFunction` is given below. 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 0424cf8..dc82a87 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -355,8 +355,8 @@ abstract class TableEnvironment(val config: TableConfig) {
     functionCatalog.registerFunction(name, function.getClass)
 
     // register in SQL API
-    val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, 
typeFactory)
-    functionCatalog.registerSqlFunctions(sqlFunctions)
+    val sqlFunction = createTableSqlFunction(name, function, typeInfo, 
typeFactory)
+    functionCatalog.registerSqlFunction(sqlFunction)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index 07a8708..6fad573 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -26,6 +26,8 @@ import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.typeutils.TypeCheckUtils
 
+import scala.collection.mutable
+
 /**
   * Generates a call to user-defined [[ScalarFunction]].
   *
@@ -44,21 +46,26 @@ class ScalarFunctionCallGen(
       operands: Seq[GeneratedExpression])
     : GeneratedExpression = {
     // determine function method and result class
-    val matchingMethod = getUserDefinedMethod(scalarFunction, "eval", 
typeInfoToClass(signature))
+    val matchingSignature = getEvalMethodSignature(scalarFunction, signature)
       .getOrElse(throw new CodeGenException("No matching signature found."))
-    val matchingSignature = matchingMethod.getParameterTypes
     val resultClass = getResultTypeClassOfScalarFunction(scalarFunction, 
matchingSignature)
 
-    // zip for variable signatures
-    var paramToOperands = matchingSignature.zip(operands)
-    if (operands.length > matchingSignature.length) {
-      operands.drop(matchingSignature.length).foreach(op =>
-        paramToOperands = paramToOperands :+ 
(matchingSignature.last.getComponentType, op)
-      )
+    // get the expanded parameter types
+    var paramClasses = new mutable.ArrayBuffer[Class[_]]
+    for (i <- operands.indices) {
+      if (i < matchingSignature.length - 1) {
+        paramClasses += matchingSignature(i)
+      } else if (matchingSignature.last.isArray) {
+        // last argument is an array type
+        paramClasses += matchingSignature.last.getComponentType
+      } else {
+        // last argument is not an array type
+        paramClasses += matchingSignature.last
+      }
     }
 
     // convert parameters for function (output boxing)
-    val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
+    val parameters = paramClasses.zip(operands).map { case (paramClass, 
operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
           } else if (ClassUtils.isPrimitiveWrapper(paramClass)

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index a3609c1..e1ad18f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -27,6 +27,8 @@ import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.typeutils.TypeCheckUtils
 
+import scala.collection.mutable
+
 /**
   * Generates a call to user-defined [[TableFunction]].
   *
@@ -45,20 +47,25 @@ class TableFunctionCallGen(
       operands: Seq[GeneratedExpression])
     : GeneratedExpression = {
     // determine function method
-    val matchingMethod = getUserDefinedMethod(tableFunction, "eval", 
typeInfoToClass(signature))
+    val matchingSignature = getEvalMethodSignature(tableFunction, signature)
       .getOrElse(throw new CodeGenException("No matching signature found."))
-    val matchingSignature = matchingMethod.getParameterTypes
 
-    // zip for variable signatures
-    var paramToOperands = matchingSignature.zip(operands)
-    if (operands.length > matchingSignature.length) {
-      operands.drop(matchingSignature.length).foreach(op =>
-        paramToOperands = paramToOperands :+ 
(matchingSignature.last.getComponentType, op)
-      )
+    // get the expanded parameter types
+    var paramClasses = new mutable.ArrayBuffer[Class[_]]
+    for (i <- operands.indices) {
+      if (i < matchingSignature.length - 1) {
+        paramClasses += matchingSignature(i)
+      } else if (matchingSignature.last.isArray) {
+        // last argument is an array type
+        paramClasses += matchingSignature.last.getComponentType
+      } else {
+        // last argument is not an array type
+        paramClasses += matchingSignature.last
+      }
     }
 
     // convert parameters for function (output boxing)
-    val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
+    val parameters = paramClasses.zip(operands).map { case (paramClass, 
operandExpr) =>
           if (paramClass.isPrimitive) {
             operandExpr
           } else if (ClassUtils.isPrimitiveWrapper(paramClass)

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index 7c57ea0..b841b31 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -49,7 +49,7 @@ abstract class UserDefinedFunction extends Serializable {
   def isDeterministic: Boolean = true
 
   final def functionIdentifier: String = {
-    val md5  =  DigestUtils.md5Hex(serialize(this))
+    val md5 = DigestUtils.md5Hex(serialize(this))
     getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
index b37d75b..0bab992 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
@@ -18,20 +18,20 @@
 
 package org.apache.flink.table.functions.utils
 
-import com.google.common.base.Predicate
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql._
 import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
-import org.apache.calcite.util.Util
+import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
-
-import scala.collection.JavaConverters._
-import java.util
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.functions.utils.TableSqlFunction._
 
 /**
   * Calcite wrapper for user-defined table functions.
@@ -40,17 +40,14 @@ class TableSqlFunction(
     name: String,
     udtf: TableFunction[_],
     rowTypeInfo: TypeInformation[_],
-    returnTypeInference: SqlReturnTypeInference,
-    operandTypeInference: SqlOperandTypeInference,
-    operandTypeChecker: SqlOperandTypeChecker,
-    paramTypes: util.List[RelDataType],
+    typeFactory: FlinkTypeFactory,
     functionImpl: FlinkTableFunctionImpl[_])
   extends SqlUserDefinedTableFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
-    returnTypeInference,
-    operandTypeInference,
-    operandTypeChecker,
-    paramTypes,
+    ReturnTypes.CURSOR,
+    createOperandTypeInference(name, udtf, typeFactory),
+    createOperandTypeChecker(name, udtf),
+    null,
     functionImpl) {
 
   /**
@@ -74,48 +71,102 @@ class TableSqlFunction(
 
 object TableSqlFunction {
 
-  /**
-    * Util function to create a [[TableSqlFunction]].
-    *
-    * @param name function name (used by SQL parser)
-    * @param udtf user-defined table function to be called
-    * @param rowTypeInfo the row type information generated by the table 
function
-    * @param typeFactory type factory for converting Flink's between Calcite's 
types
-    * @param functionImpl Calcite table function schema
-    * @return [[TableSqlFunction]]
-    */
-  def apply(
+  private[flink] def createOperandTypeInference(
     name: String,
     udtf: TableFunction[_],
-    rowTypeInfo: TypeInformation[_],
-    typeFactory: FlinkTypeFactory,
-    functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
-
-    val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
-    val typeFamilies: util.List[SqlTypeFamily] = new 
util.ArrayList[SqlTypeFamily]
-    // derives operands' data types and type families
-    functionImpl.getParameters.asScala.foreach{ o =>
-      val relType: RelDataType = o.getType(typeFactory)
-      argTypes.add(relType)
-      typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, 
SqlTypeFamily.ANY))
+    typeFactory: FlinkTypeFactory)
+  : SqlOperandTypeInference = {
+    /**
+      * Operand type inference based on [[TableFunction]] given information.
+      */
+    new SqlOperandTypeInference {
+      override def inferOperandTypes(
+          callBinding: SqlCallBinding,
+          returnType: RelDataType,
+          operandTypes: Array[RelDataType]): Unit = {
+
+        val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+        val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
+          .getOrElse(throw new ValidationException(
+            s"Given parameters of function '$name' do not match any signature. 
\n" +
+              s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+              s"Expected: ${signaturesToString(udtf, "eval")}"))
+
+        val inferredTypes = foundSignature
+          .map(TypeExtractor.getForClass(_))
+          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
+
+        for (i <- operandTypes.indices) {
+          if (i < inferredTypes.length - 1) {
+            operandTypes(i) = inferredTypes(i)
+          } else if (null != inferredTypes.last.getComponentType) {
+            // last argument is a collection, the array type
+            operandTypes(i) = inferredTypes.last.getComponentType
+          } else {
+            operandTypes(i) = inferredTypes.last
+          }
+        }
+      }
     }
-    // derives whether the 'input'th parameter of a method is optional.
-    val optional: Predicate[Integer] = new Predicate[Integer]() {
-      def apply(input: Integer): Boolean = {
-        functionImpl.getParameters.get(input).isOptional
+  }
+
+  private[flink] def createOperandTypeChecker(
+    name: String,
+    udtf: TableFunction[_])
+  : SqlOperandTypeChecker = {
+
+    val signatures = getMethodSignatures(udtf, "eval")
+
+    /**
+      * Operand type checker based on [[TableFunction]] given information.
+      */
+    new SqlOperandTypeChecker {
+      override def getAllowedSignatures(op: SqlOperator, opName: String): 
String = {
+        s"$opName[${signaturesToString(udtf, "eval")}]"
+      }
+
+      override def getOperandCountRange: SqlOperandCountRange = {
+        var min = 255
+        var max = -1
+        signatures.foreach( sig => {
+          var len = sig.length
+          if (len > 0 && sig(sig.length - 1).isArray) {
+            max = 254  // according to JVM spec 4.3.3
+            len = sig.length - 1
+          }
+          max = Math.max(len, max)
+          min = Math.min(len, min)
+        })
+        SqlOperandCountRanges.between(min, max)
+      }
+
+      override def checkOperandTypes(
+        callBinding: SqlCallBinding,
+        throwOnFailure: Boolean)
+      : Boolean = {
+        val operandTypeInfo = getOperandTypeInfo(callBinding)
+
+        val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
+
+        if (foundSignature.isEmpty) {
+          if (throwOnFailure) {
+            throw new ValidationException(
+              s"Given parameters of function '$name' do not match any 
signature. \n" +
+                s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+                s"Expected: ${signaturesToString(udtf, "eval")}")
+          } else {
+            false
+          }
+        } else {
+          true
+        }
       }
+
+      override def isOptional(i: Int): Boolean = false
+
+      override def getConsistency: Consistency = Consistency.NONE
+
     }
-    // create type check for the operands
-    val typeChecker: FamilyOperandTypeChecker = 
OperandTypes.family(typeFamilies, optional)
-
-    new TableSqlFunction(
-      name,
-      udtf,
-      rowTypeInfo,
-      ReturnTypes.CURSOR,
-      InferTypes.explicit(argTypes),
-      typeChecker,
-      argTypes,
-      functionImpl)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index f53bcde..6a90569 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -258,7 +258,7 @@ object UserDefinedFunctionUtils {
   }
 
   /**
-    * Create [[SqlFunction]]s for a [[TableFunction]]'s every eval method
+    * Create [[SqlFunction]] for a [[TableFunction]]
     *
     * @param name function name
     * @param tableFunction table function
@@ -266,19 +266,15 @@ object UserDefinedFunctionUtils {
     * @param typeFactory type factory
     * @return the TableSqlFunction
     */
-  def createTableSqlFunctions(
+  def createTableSqlFunction(
       name: String,
       tableFunction: TableFunction[_],
       resultType: TypeInformation[_],
       typeFactory: FlinkTypeFactory)
-    : Seq[SqlFunction] = {
+    : SqlFunction = {
     val (fieldNames, fieldIndexes, _) = 
UserDefinedFunctionUtils.getFieldInfo(resultType)
-    val evalMethods = checkAndExtractMethods(tableFunction, "eval")
-
-    evalMethods.map { method =>
-      val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, 
fieldNames, method)
-      TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
-    }
+    val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, 
fieldNames)
+    new TableSqlFunction(name, tableFunction, resultType, typeFactory, 
function)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 795a506..559d20d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -723,10 +723,10 @@ case class LogicalTableFunctionCall(
     val function = new FlinkTableFunctionImpl(
       resultType,
       fieldIndexes,
-      if (fieldNames.isEmpty) generatedNames else fieldNames, evalMethod
+      if (fieldNames.isEmpty) generatedNames else fieldNames
     )
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    val sqlFunction = TableSqlFunction(
+    val sqlFunction = new TableSqlFunction(
       tableFunction.functionIdentifier,
       tableFunction,
       resultType,

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
index f2d9f2a..802fd85 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -82,11 +82,11 @@ class LogicalUnnestRule(
           val componentType = arrayType.getComponentType
 
           // create table function
-          val explodeTableFunc = 
UserDefinedFunctionUtils.createTableSqlFunctions(
+          val explodeTableFunc = 
UserDefinedFunctionUtils.createTableSqlFunction(
             "explode",
             ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
             FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
-            cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]).head
+            cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory])
 
           // create table function call
           val rexCall = cluster.getRexBuilder.makeCall(

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index 27fc2ea..cab8ea9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -17,12 +17,12 @@
  */
 package org.apache.flink.table.plan.schema
 
-import java.lang.reflect.{Method, Type}
+import java.lang.reflect.Type
 import java.util
+import java.util.Collections
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.calcite.schema.TableFunction
-import org.apache.calcite.schema.impl.ReflectiveFunctionBase
+import org.apache.calcite.schema.{FunctionParameter, TableFunction}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.TableException
@@ -36,10 +36,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
 class FlinkTableFunctionImpl[T](
     val typeInfo: TypeInformation[T],
     val fieldIndexes: Array[Int],
-    val fieldNames: Array[String],
-    val evalMethod: Method)
-  extends ReflectiveFunctionBase(evalMethod)
-  with TableFunction {
+    val fieldNames: Array[String])
+  extends TableFunction {
 
   if (fieldIndexes.length != fieldNames.length) {
     throw new TableException(
@@ -71,6 +69,9 @@ class FlinkTableFunctionImpl[T](
 
   override def getElementType(arguments: util.List[AnyRef]): Type = 
classOf[Array[Object]]
 
+  // we do never use the FunctionParameters, so return an empty list
+  override def getParameters: util.List[FunctionParameter] = 
Collections.emptyList()
+
   override def getRowType(typeFactory: RelDataTypeFactory,
                           arguments: util.List[AnyRef]): RelDataType = {
     val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 55dbe4c..5254ceb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -48,22 +48,6 @@ class FunctionCatalog {
     sqlFunctions += sqlFunction
   }
 
-  /**
-    * Register multiple SQL functions at the same time. The functions have the 
same name.
-    */
-  def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
-    if (functions.nonEmpty) {
-      val name = functions.head.getName
-      // check that all functions have the same name
-      if (functions.forall(_.getName == name)) {
-        sqlFunctions --= sqlFunctions.filter(_.getName == name)
-        sqlFunctions ++= functions
-      } else {
-        throw ValidationException("The SQL functions to be registered have 
different names.")
-      }
-    }
-  }
-
   def getSqlOperatorTable: SqlOperatorTable =
     ChainedSqlOperatorTable.of(
       new BasicOperatorTable(),

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
index cd92f49..2a27add 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -35,4 +35,21 @@ public class JavaUserDefinedTableFunctions {
                        collect(c);
                }
        }
+
+       /**
+        * Emit every input string.
+        */
+       public static class JavaVarsArgTableFunc0 extends TableFunction<String> 
{
+               public void eval(String... strs) {
+                       for (String s : strs) {
+                               collect(s);
+                       }
+               }
+
+               public void eval(int val, String str) {
+                       for (int i = 0; i < val; i++) {
+                               collect(str);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
index a9938cb..719141f 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaVarsArgTableFunc0
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, 
TableFunc2, _}
 import org.junit.Test
@@ -234,4 +235,52 @@ class CorrelateTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  @Test
+  def testTableFunctionWithVariableArguments(): Unit = {
+    val util = batchTestUtil()
+    val func1 = new JavaVarsArgTableFunc0
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    var sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1('hello', 
'world', c)) AS T(s)"
+
+    var expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func1('hello', 'world', $cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+
+    // test scala var arg function
+    val func2 = new VarArgsFunc0
+    util.addFunction("func2", func2)
+
+    sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func2('hello', 
'world', c)) AS T(s)"
+
+    expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", "func2('hello', 'world', $cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
index 9bb7bcf..955ed4b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedTableFunctions.JavaVarsArgTableFunc0
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, 
TableFunc2, _}
 import org.junit.Test
@@ -233,4 +234,52 @@ class CorrelateTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  @Test
+  def testTableFunctionWithVariableArguments(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new JavaVarsArgTableFunc0
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    var sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1('hello', 
'world', c)) AS T(s)"
+
+    var expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1('hello', 'world', $cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+
+    // test scala var arg function
+    val func2 = new VarArgsFunc0
+    util.addFunction("func2", func2)
+
+    sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func2('hello', 
'world', c)) AS T(s)"
+
+    expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func2('hello', 'world', $cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79c17afa/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index f58feed..66593e9 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -173,7 +173,7 @@ class CorrelateValidationTest extends TableTestBase {
     // SQL API call
     expectExceptionThrown(
       util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func2(c, 
c))"),
-      "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
+      "Given parameters of function 'func2' do not match any signature.")
   }
 
   // 
----------------------------------------------------------------------------------------------

Reply via email to