[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671925#comment-15671925
 ] 

ASF GitHub Bot commented on FLINK-4469:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2653#discussion_r88339010
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
    @@ -611,6 +612,130 @@ class Table(
       }
     
       /**
    +    * The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
    +    * that produces matching values from the table-valued function (which 
is on the right side of
    +    * the operator).
    +    *
    +    * The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   class MySplitUDTF extends TableFunction[String] {
    +    *     def eval(str: String): Unit = {
    +    *       str.split("#").foreach(collect)
    +    *     }
    +    *   }
    +    *
    +    *   val split = new MySplitUDTF()
    +    *   table.crossApply(split('c).as('s)).select('a,'b,'c,'s)
    +    * }}}
    +    */
    +  def crossApply(udtf: TableFunctionCall[_]): Table = {
    +    applyInternal(udtf, JoinType.INNER)
    +  }
    +
    +  /**
    +    * The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
    +    * that produces matching values from the table-valued function (which 
is on the right side of
    +    * the operator).
    +    *
    +    * The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   class MySplitUDTF extends TableFunction[String] {
    +    *     def eval(str: String): Unit = {
    +    *       str.split("#").foreach(collect)
    +    *     }
    +    *   }
    +    *
    +    *   val split = new MySplitUDTF()
    +    *   table.crossApply("split('c') as (s)").select("a, b, c, s")
    +    * }}}
    +    */
    +  def crossApply(udtf: String): Table = {
    +    applyInternal(udtf, JoinType.INNER)
    +  }
    +
    +  /**
    +    * The Cross Apply returns rows form the outer table (table on the left 
of the Apply operator)
    +    * that produces matching values from the table-valued function (which 
is on the right side of
    +    * the operator).
    +    *
    +    * The Cross Apply is equivalent to Inner Join, but it works with a 
table-valued function.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   class MySplitUDTF extends TableFunction[String] {
    +    *     def eval(str: String): Unit = {
    +    *       str.split("#").foreach(collect)
    +    *     }
    +    *   }
    +    *
    +    *   val split = new MySplitUDTF()
    +    *   table.outerApply(split('c).as('s)).select('a,'b,'c,'s)
    +    * }}}
    +    */
    +  def outerApply(udtf: TableFunctionCall[_]): Table = {
    +    applyInternal(udtf, JoinType.LEFT_OUTER)
    +  }
    +
    +  /**
    +    * The Outer Apply returns all the rows from the outer table (table on 
the left of the Apply
    +    * operator), and rows that do not matches the condition from the 
table-valued function (which
    +    * is on the right side of the operator), NULL values are displayed.
    +    *
    +    * The Outer Apply is equivalent to Left Outer Join, but it works with 
a table-valued function.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   val split = new MySplitUDTF()
    +    *   table.crossApply("split('c') as (s)").select("a, b, c, s")
    +    * }}}
    +    */
    +  def outerApply(udtf: String): Table = {
    +    applyInternal(udtf, JoinType.LEFT_OUTER)
    +  }
    +
    +  private def applyInternal(udtfString: String, joinType: JoinType): Table 
= {
    +    val node = ExpressionParser.parseLogicalNode(udtfString)
    +    var alias: Option[Seq[Expression]] = None
    +    val functionCall = node match {
    +      case AliasNode(aliasList, child) =>
    +        alias = Some(aliasList)
    +        child
    +      case _ => node
    +    }
    +
    +    functionCall match {
    +      case call @ UnresolvedTableFunctionCall(name, args) =>
    +        val udtfCall = 
tableEnv.getFunctionCatalog.lookupTableFunction(name, args)
    +        if (alias.isDefined) {
    +          applyInternal(udtfCall.as(alias.get: _*), joinType)
    +        } else {
    +          applyInternal(udtfCall, joinType)
    +        }
    +      case _ => throw new TableException("Cross/Outer Apply only accept 
TableFunction")
    +    }
    +  }
    +
    +  private def applyInternal(node: LogicalNode, joinType: JoinType): Table 
= {
    +    node match {
    +      case udtf: TableFunctionCall[_] =>
    +        udtf.setChild(this.logicalPlan)
    +        new Table(
    +          tableEnv,
    +          Join(this.logicalPlan, udtf.validate(tableEnv), joinType, None,
    +               
Some(relBuilder.getCluster.createCorrel())).validate(tableEnv))
    --- End diff --
    
    We kept Calcite code so far out of our `LogicalNode` (except for 
`construct()` of course). I think we can move the 
`relBuilder.getCluster.createCorrel()` call also into `Join.construct()` and 
change the `corId` constructor parameter of `Join` to a boolean flag that 
indicates whether we need to generate a `corId` or not.


> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
>                 Key: FLINK-4469
>                 URL: https://issues.apache.org/jira/browse/FLINK-4469
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
>     public Iterable<Word> eval(String str) {
>         if (str != null) {
>             for (String s : str.split(",")) {
>                 collect(new Word(s, s.length()));
>             }
>         }
>     }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")        
>      .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>      .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>      .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>      .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to