[ https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148715#comment-16148715 ]
ASF GitHub Bot commented on FLINK-7439: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4536#discussion_r136287683 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala --- @@ -74,48 +75,102 @@ class TableSqlFunction( object TableSqlFunction { - /** - * Util function to create a [[TableSqlFunction]]. - * - * @param name function name (used by SQL parser) - * @param udtf user-defined table function to be called - * @param rowTypeInfo the row type information generated by the table function - * @param typeFactory type factory for converting Flink's between Calcite's types - * @param functionImpl Calcite table function schema - * @return [[TableSqlFunction]] - */ - def apply( + private[flink] def createOperandTypeInference( name: String, udtf: TableFunction[_], - rowTypeInfo: TypeInformation[_], - typeFactory: FlinkTypeFactory, - functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = { - - val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType] - val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily] - // derives operands' data types and type families - functionImpl.getParameters.asScala.foreach{ o => - val relType: RelDataType = o.getType(typeFactory) - argTypes.add(relType) - typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY)) + typeFactory: FlinkTypeFactory) + : SqlOperandTypeInference = { + /** + * Operand type inference based on [[TableFunction]] given information. + */ + new SqlOperandTypeInference { + override def inferOperandTypes( + callBinding: SqlCallBinding, + returnType: RelDataType, + operandTypes: Array[RelDataType]): Unit = { + + val operandTypeInfo = getOperandTypeInfo(callBinding) + + val foundSignature = getEvalMethodSignature(udtf, operandTypeInfo) + .getOrElse(throw new ValidationException( + s"Given parameters of function '$name' do not match any signature. \n" + + s"Actual: ${signatureToString(operandTypeInfo)} \n" + + s"Expected: ${signaturesToString(udtf, "eval")}")) + + val inferredTypes = foundSignature + .map(TypeExtractor.getForClass(_)) + .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true)) + + for (i <- operandTypes.indices) { + if (i < inferredTypes.length - 1) { + operandTypes(i) = inferredTypes(i) + } else if (null != inferredTypes.last.getComponentType) { + // last argument is a collection, the array type + operandTypes(i) = inferredTypes.last.getComponentType + } else { + operandTypes(i) = inferredTypes.last + } + } + } } - // derives whether the 'input'th parameter of a method is optional. - val optional: Predicate[Integer] = new Predicate[Integer]() { - def apply(input: Integer): Boolean = { - functionImpl.getParameters.get(input).isOptional + } + + private[flink] def createOperandTypeChecker( --- End diff -- Yes, I agree with you. I tried to merge these code but failed for some test cases. I didn't figure it out. But I think we can do it in another issue. > Support variable arguments for UDTF in SQL > ------------------------------------------ > > Key: FLINK-7439 > URL: https://issues.apache.org/jira/browse/FLINK-7439 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Jark Wu > Assignee: Jark Wu > > Currently, both UDF and UDAF support variable parameters, but UDTF not. > FLINK-5882 supports variable UDTF for Table API only, but missed SQL. -- This message was sent by Atlassian JIRA (v6.4.14#64029)