[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671937#comment-15671937
 ] 

ASF GitHub Bot commented on FLINK-4469:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2653#discussion_r88337239
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/call.scala
 ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.logical
    +
    +import java.lang.reflect.Method
    +
    +import org.apache.calcite.rel.logical.LogicalTableFunctionScan
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.table.{FlinkTypeFactory, TableEnvironment, 
TableException, UnresolvedException}
    +import org.apache.flink.api.table.expressions.{Attribute, Expression, 
ResolvedFieldReference, UnresolvedFieldReference}
    +import org.apache.flink.api.table.functions.TableFunction
    +import org.apache.flink.api.table.functions.utils.TableSqlFunction
    +import 
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
    +import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
    +import org.apache.flink.api.table.validate.ValidationFailure
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * General expression for unresolved user-defined table function calls.
    +  */
    +case class UnresolvedTableFunctionCall(functionName: String, args: 
Seq[Expression])
    +  extends LogicalNode {
    +
    +  override def output: Seq[Attribute] =
    +    throw UnresolvedException("Invalid call to output on 
UnresolvedTableFunctionCall")
    +
    +  override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder =
    +    throw UnresolvedException("Invalid call to construct on 
UnresolvedTableFunctionCall")
    +
    +  override private[flink] def children: Seq[LogicalNode] =
    +    throw UnresolvedException("Invalid call to children on 
UnresolvedTableFunctionCall")
    +}
    +
    +/**
    +  * LogicalNode for calling a user-defined table functions.
    +  * @param tableFunction table function to be called (might be overloaded)
    +  * @param parameters actual parameters
    +  * @param alias output fields renaming
    +  * @tparam T type of returned table
    +  */
    +case class TableFunctionCall[T: TypeInformation](
    +  tableFunction: TableFunction[T],
    +  parameters: Seq[Expression],
    +  alias: Option[Array[String]]) extends UnaryNode {
    +
    +  private var table: LogicalNode = _
    +  override def child: LogicalNode = table
    +
    +  def setChild(child: LogicalNode): TableFunctionCall[T] = {
    +    table = child
    +    this
    +  }
    +
    +  private val resultType: TypeInformation[T] =
    +    if (tableFunction.getResultType == null) {
    +      implicitly[TypeInformation[T]]
    +    } else {
    +      tableFunction.getResultType
    +    }
    +
    +  private val fieldNames: Array[String] =
    +    if (alias.isEmpty) {
    +      getFieldAttribute[T](resultType)._1
    +    } else {
    +      alias.get
    +    }
    +  private val fieldTypes: Array[TypeInformation[_]] = 
getFieldAttribute[T](resultType)._2
    +
    +  /**
    +    * Assigns an alias for this table function returned fields that the 
following `select()` clause
    +    * can refer to.
    +    *
    +    * @param aliasList alias for this window
    +    * @return this table function
    +    */
    +  def as(aliasList: Expression*): TableFunctionCall[T] = {
    +    if (aliasList == null) {
    +      return this
    +    }
    +    if (aliasList.length != fieldNames.length) {
    +      failValidation("Aliasing not match number of fields")
    +    } else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
    +      failValidation("Alias only accept name expressions as arguments")
    +    } else {
    +      val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray
    +      TableFunctionCall(tableFunction, parameters, Some(names))
    +    }
    +  }
    +
    +  override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
    +    case (n, t) =>  ResolvedFieldReference(n, t)
    +  }
    +
    +  override def makeCopy(newArgs: Array[AnyRef]): TableFunctionCall[T] = {
    +    if (newArgs.length != 3) {
    +      throw new TableException("Invalid constructor params")
    +    }
    +    val udtfParam: TableFunction[T] = 
newArgs.head.asInstanceOf[TableFunction[T]]
    +    val expressionParams = newArgs(1).asInstanceOf[Seq[Expression]]
    +    val names = newArgs.last.asInstanceOf[Option[Array[String]]]
    +    copy(udtfParam, expressionParams, names)
    +      .asInstanceOf[TableFunctionCall[T]].setChild(child)
    +  }
    +
    +  private var evalMethod: Method = _
    +
    +  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    val node = super.validate(tableEnv).asInstanceOf[TableFunctionCall[T]]
    +    val signature = node.parameters.map(_.resultType)
    +    // look for a signature that matches the input types
    +    val foundMethod = getEvalMethod(tableFunction, signature)
    +    if (foundMethod.isEmpty) {
    +      ValidationFailure(s"Given parameters do not match any signature. \n" 
+
    +                          s"Actual: ${signatureToString(signature)} \n" +
    --- End diff --
    
    I think `signatureToString` does not include the name of the function. 
Would be good to add this to the exception message to help the user identify 
the problem.


> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
>                 Key: FLINK-4469
>                 URL: https://issues.apache.org/jira/browse/FLINK-4469
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
>     public Iterable<Word> eval(String str) {
>         if (str != null) {
>             for (String s : str.split(",")) {
>                 collect(new Word(s, s.length()));
>             }
>         }
>     }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")        
>      .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>      .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>      .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>      .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to