[ 
https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148715#comment-16148715
 ] 

ASF GitHub Bot commented on FLINK-7439:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4536#discussion_r136287683
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala
 ---
    @@ -74,48 +75,102 @@ class TableSqlFunction(
     
     object TableSqlFunction {
     
    -  /**
    -    * Util function to create a [[TableSqlFunction]].
    -    *
    -    * @param name function name (used by SQL parser)
    -    * @param udtf user-defined table function to be called
    -    * @param rowTypeInfo the row type information generated by the table 
function
    -    * @param typeFactory type factory for converting Flink's between 
Calcite's types
    -    * @param functionImpl Calcite table function schema
    -    * @return [[TableSqlFunction]]
    -    */
    -  def apply(
    +  private[flink] def createOperandTypeInference(
         name: String,
         udtf: TableFunction[_],
    -    rowTypeInfo: TypeInformation[_],
    -    typeFactory: FlinkTypeFactory,
    -    functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
    -
    -    val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
    -    val typeFamilies: util.List[SqlTypeFamily] = new 
util.ArrayList[SqlTypeFamily]
    -    // derives operands' data types and type families
    -    functionImpl.getParameters.asScala.foreach{ o =>
    -      val relType: RelDataType = o.getType(typeFactory)
    -      argTypes.add(relType)
    -      typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, 
SqlTypeFamily.ANY))
    +    typeFactory: FlinkTypeFactory)
    +  : SqlOperandTypeInference = {
    +    /**
    +      * Operand type inference based on [[TableFunction]] given 
information.
    +      */
    +    new SqlOperandTypeInference {
    +      override def inferOperandTypes(
    +          callBinding: SqlCallBinding,
    +          returnType: RelDataType,
    +          operandTypes: Array[RelDataType]): Unit = {
    +
    +        val operandTypeInfo = getOperandTypeInfo(callBinding)
    +
    +        val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo)
    +          .getOrElse(throw new ValidationException(
    +            s"Given parameters of function '$name' do not match any 
signature. \n" +
    +              s"Actual: ${signatureToString(operandTypeInfo)} \n" +
    +              s"Expected: ${signaturesToString(udtf, "eval")}"))
    +
    +        val inferredTypes = foundSignature
    +          .map(TypeExtractor.getForClass(_))
    +          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
    +
    +        for (i <- operandTypes.indices) {
    +          if (i < inferredTypes.length - 1) {
    +            operandTypes(i) = inferredTypes(i)
    +          } else if (null != inferredTypes.last.getComponentType) {
    +            // last argument is a collection, the array type
    +            operandTypes(i) = inferredTypes.last.getComponentType
    +          } else {
    +            operandTypes(i) = inferredTypes.last
    +          }
    +        }
    +      }
         }
    -    // derives whether the 'input'th parameter of a method is optional.
    -    val optional: Predicate[Integer] = new Predicate[Integer]() {
    -      def apply(input: Integer): Boolean = {
    -        functionImpl.getParameters.get(input).isOptional
    +  }
    +
    +  private[flink] def createOperandTypeChecker(
    --- End diff --
    
    Yes, I agree with you. I tried to merge these code but failed for some test 
cases. I didn't figure it out. But I think we can do it in another issue. 


> Support variable arguments for UDTF in SQL
> ------------------------------------------
>
>                 Key: FLINK-7439
>                 URL: https://issues.apache.org/jira/browse/FLINK-7439
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>
> Currently, both UDF and UDAF support variable parameters, but UDTF not. 
> FLINK-5882 supports variable UDTF for Table API only, but missed SQL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to