[
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674214#comment-15674214
]
ASF GitHub Bot commented on FLINK-4469:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/2653#discussion_r88488494
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rex.{RexNode, RexCall}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.{CodeGenerator,
GeneratedExpression, GeneratedFunction}
+import org.apache.flink.api.table.functions.utils.TableSqlFunction
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
+
+import scala.collection.JavaConversions._
+
+/**
+ * cross/outer apply a user-defined table function
+ */
+trait FlinkCorrelate {
+
+ private[flink] def functionBody(generator: CodeGenerator,
+ udtfTypeInfo: TypeInformation[Any],
+ rowType: RelDataType,
+ rexCall: RexCall,
+ condition: RexNode,
+ config: TableConfig,
+ joinType: SemiJoinType,
+ expectedType:
Option[TypeInformation[Any]]): String = {
+
+ val returnType = determineReturnType(
+ rowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ val (input1AccessExprs, input2AccessExprs) =
generator.generateCorrelateAccessExprs
+ val crossResultExpr =
generator.generateResultExpression(input1AccessExprs ++ input2AccessExprs,
+ returnType, rowType.getFieldNames)
+
+ val input2NullExprs = input2AccessExprs.map(
+ x => GeneratedExpression("null", "true", "", x.resultType))
+ val outerResultExpr =
generator.generateResultExpression(input1AccessExprs ++ input2NullExprs,
+ returnType, rowType.getFieldNames)
+
+ val call = generator.generateExpression(rexCall)
+ var body = call.code +
+ s"""
+ |scala.collection.Iterator iter =
${call.resultTerm}.getRowsIterator();
+ """.stripMargin
+ if (joinType == SemiJoinType.INNER) {
+ // cross apply
+ body +=
+ s"""
+ |if (iter.isEmpty()) {
+ | return;
+ |}
+ """.stripMargin
+ } else {
+ // outer apply
+ body +=
+ s"""
+ |if (iter.isEmpty()) {
+ | ${outerResultExpr.code}
+ |
${generator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+ | return;
+ |}
+ """.stripMargin
+ }
+
+ val projection = if (condition == null) {
+ s"""
+ |${crossResultExpr.code}
+
|${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
+ """.stripMargin
+ } else {
+ val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
{
+ override def input1Term: String = input2Term
+ }
+ val filterCondition = filterGenerator.generateExpression(condition)
+ s"""
+ |${filterGenerator.reuseInputUnboxingCode()}
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${crossResultExpr.code}
+ |
${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
+ |}
+ |""".stripMargin
+ }
+
+ val outputTypeClass = udtfTypeInfo.getTypeClass.getCanonicalName
+ body +=
+ s"""
+ |while (iter.hasNext()) {
+ | $outputTypeClass ${generator.input2Term} = ($outputTypeClass)
iter.next();
+ | $projection
+ |}
+ """.stripMargin
+ body
+ }
+
+ private[flink] def correlateMapFunction(
+ genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]):
FlatMapRunner[Any, Any] = {
+
+ new FlatMapRunner[Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+ }
+
+ private[flink] def inputRowType(input: RelNode): TypeInformation[Any] = {
+ val fieldTypes = input.getRowType.getFieldList.map(t =>
+ FlinkTypeFactory.toTypeInfo(t.getType))
+ new RowTypeInfo(fieldTypes).asInstanceOf[TypeInformation[Any]]
+ }
+
+ private[flink] def unwrap(relNode: RelNode): LogicalTableFunctionScan = {
+ relNode match {
+ case rel: LogicalTableFunctionScan => rel
+ case rel: RelSubset => unwrap(rel.getRelList.get(0))
+ case _ => ???
+ }
+ }
+
+ private[flink] def selectToString(rowType: RelDataType): String = {
+ rowType.getFieldNames.mkString(",")
+ }
+
+ private[flink] def correlateOpName(rexCall: RexCall,
+ sqlFunction: TableSqlFunction,
--- End diff --
In general we use only two spaces for indention.
> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row
> and output a single output row. In contrast, table-generating functions
> transform a single input row to multiple output rows. It is very useful in
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function.
> NOTE:
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method
> to call according to parameter types and number.
> {code}
> public class Word {
> public String word;
> public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
> public Iterable<Word> eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")
> .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
> .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
> .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
> .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1]
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)