[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674214#comment-15674214
 ] 

ASF GitHub Bot commented on FLINK-4469:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2653#discussion_r88488494
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
 ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.nodes
    +
    +import org.apache.calcite.plan.volcano.RelSubset
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.logical.LogicalTableFunctionScan
    +import org.apache.calcite.rex.{RexNode, RexCall}
    +import org.apache.calcite.sql.SemiJoinType
    +import org.apache.flink.api.common.functions.FlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.table.codegen.{CodeGenerator, 
GeneratedExpression, GeneratedFunction}
    +import org.apache.flink.api.table.functions.utils.TableSqlFunction
    +import org.apache.flink.api.table.runtime.FlatMapRunner
    +import org.apache.flink.api.table.typeutils.RowTypeInfo
    +import org.apache.flink.api.table.typeutils.TypeConverter._
    +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * cross/outer apply a user-defined table function
    +  */
    +trait FlinkCorrelate {
    +
    +  private[flink] def functionBody(generator: CodeGenerator,
    +                                  udtfTypeInfo: TypeInformation[Any],
    +                                  rowType: RelDataType,
    +                                  rexCall: RexCall,
    +                                  condition: RexNode,
    +                                  config: TableConfig,
    +                                  joinType: SemiJoinType,
    +                                  expectedType: 
Option[TypeInformation[Any]]): String = {
    +
    +    val returnType = determineReturnType(
    +      rowType,
    +      expectedType,
    +      config.getNullCheck,
    +      config.getEfficientTypeUsage)
    +
    +    val (input1AccessExprs, input2AccessExprs) = 
generator.generateCorrelateAccessExprs
    +    val crossResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
    +      returnType, rowType.getFieldNames)
    +
    +    val input2NullExprs = input2AccessExprs.map(
    +      x => GeneratedExpression("null", "true", "", x.resultType))
    +    val outerResultExpr = 
generator.generateResultExpression(input1AccessExprs ++ input2NullExprs,
    +      returnType, rowType.getFieldNames)
    +
    +    val call = generator.generateExpression(rexCall)
    +    var body = call.code +
    +               s"""
    +                  |scala.collection.Iterator iter = 
${call.resultTerm}.getRowsIterator();
    +                """.stripMargin
    +    if (joinType == SemiJoinType.INNER) {
    +      // cross apply
    +      body +=
    +        s"""
    +           |if (iter.isEmpty()) {
    +           |  return;
    +           |}
    +        """.stripMargin
    +    } else {
    +      // outer apply
    +      body +=
    +        s"""
    +           |if (iter.isEmpty()) {
    +           |  ${outerResultExpr.code}
    +           |  
${generator.collectorTerm}.collect(${outerResultExpr.resultTerm});
    +           |  return;
    +           |}
    +        """.stripMargin
    +    }
    +
    +    val projection = if (condition == null) {
    +      s"""
    +         |${crossResultExpr.code}
    +         
|${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
    +       """.stripMargin
    +    } else {
    +      val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo) 
{
    +        override def input1Term: String = input2Term
    +      }
    +      val filterCondition = filterGenerator.generateExpression(condition)
    +      s"""
    +         |${filterGenerator.reuseInputUnboxingCode()}
    +         |${filterCondition.code}
    +         |if (${filterCondition.resultTerm}) {
    +         |  ${crossResultExpr.code}
    +         |  
${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
    +         |}
    +         |""".stripMargin
    +    }
    +
    +    val outputTypeClass = udtfTypeInfo.getTypeClass.getCanonicalName
    +    body +=
    +      s"""
    +         |while (iter.hasNext()) {
    +         |  $outputTypeClass ${generator.input2Term} = ($outputTypeClass) 
iter.next();
    +         |  $projection
    +         |}
    +       """.stripMargin
    +    body
    +  }
    +
    +  private[flink] def correlateMapFunction(
    +     genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): 
FlatMapRunner[Any, Any] = {
    +
    +    new FlatMapRunner[Any, Any](
    +      genFunction.name,
    +      genFunction.code,
    +      genFunction.returnType)
    +  }
    +
    +  private[flink] def inputRowType(input: RelNode): TypeInformation[Any] = {
    +    val fieldTypes = input.getRowType.getFieldList.map(t =>
    +        FlinkTypeFactory.toTypeInfo(t.getType))
    +    new RowTypeInfo(fieldTypes).asInstanceOf[TypeInformation[Any]]
    +  }
    +
    +  private[flink] def unwrap(relNode: RelNode): LogicalTableFunctionScan = {
    +    relNode match {
    +      case rel: LogicalTableFunctionScan => rel
    +      case rel: RelSubset => unwrap(rel.getRelList.get(0))
    +      case _ => ???
    +    }
    +  }
    +
    +  private[flink] def selectToString(rowType: RelDataType): String = {
    +    rowType.getFieldNames.mkString(",")
    +  }
    +
    +  private[flink] def correlateOpName(rexCall: RexCall,
    +                                   sqlFunction: TableSqlFunction,
    --- End diff --
    
    In general we use only two spaces for indention.


> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
>                 Key: FLINK-4469
>                 URL: https://issues.apache.org/jira/browse/FLINK-4469
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
>     public Iterable<Word> eval(String str) {
>         if (str != null) {
>             for (String s : str.split(",")) {
>                 collect(new Word(s, s.length()));
>             }
>         }
>     }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")        
>      .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>      .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>      .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>      .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to