[
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15713917#comment-15713917
]
ASF GitHub Bot commented on FLINK-4469:
---------------------------------------
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/2653#discussion_r90583213
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableFunctionCall.scala
---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.{Expression,
UnresolvedFieldReference}
+import org.apache.flink.api.table.functions.TableFunction
+import
org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.getFieldInfo
+import org.apache.flink.api.table.plan.logical.{LogicalNode,
LogicalTableFunctionCall}
+
+
+/**
+ * A [[TableFunctionCall]] represents a call to a [[TableFunction]] with
actual parameters.
+ *
+ * For Scala users, Flink will help to parse a [[TableFunction]] to
[[TableFunctionCall]]
+ * implicitly. For Java users, Flink will help to parse a string
expression to
+ * [[TableFunctionCall]]. So users do not need to create a
[[TableFunctionCall]] manually.
+ *
+ * @param functionName function name
+ * @param tableFunction user-defined table function
+ * @param parameters actual parameters of function
+ * @param resultType type information of returned table
+ */
+case class TableFunctionCall(
+ functionName: String,
+ tableFunction: TableFunction[_],
+ parameters: Seq[Expression],
+ resultType: TypeInformation[_]) {
+
+ private var aliases: Option[Seq[Expression]] = None
+
+ /**
+ * Assigns an alias for this table function returned fields that the
following `select()` clause
+ * can refer to.
+ *
+ * @param aliasList alias for this table function returned fields
+ * @return this table function call
+ */
+ def as(aliasList: Expression*): TableFunctionCall = {
+ this.aliases = Some(aliasList)
+ this
+ }
+
+ /**
+ * Converts an API class to a logical node for planning.
+ */
+ private[flink] def toLogicalTableFunctionCall(child: LogicalNode):
LogicalTableFunctionCall = {
+ val originNames = getFieldInfo(resultType)._1
+
+ // determine the final field names
+ val fieldNames = if (aliases.isDefined) {
+ val aliasList = aliases.get
+ if (aliasList.length != originNames.length) {
+ throw ValidationException(
+ s"List of column aliases must have same degree as table; " +
+ s"the returned table of function '$functionName' has
${originNames.length} " +
+ s"columns (${originNames.mkString(",")}), " +
+ s"whereas alias list has ${aliasList.length} columns")
+ } else if
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+ throw ValidationException("Alias only accept name expressions as
arguments")
+ } else {
+
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray
+ }
+ } else {
+ originNames
+ }
+
+ LogicalTableFunctionCall(
+ functionName,
+ tableFunction,
+ parameters,
+ resultType,
+ fieldNames,
+ child)
+ }
+}
+
+
+case class TableFunctionCallBuilder[T: TypeInformation](udtf:
TableFunction[T]) {
--- End diff --
I would move this into `ExpressionUtils`, learned from your new
[PR](https://github.com/apache/flink/pull/2919/files#diff-04d1bca648d7ee47ab9ce787c8d944a6R567)
π
> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
> Key: FLINK-4469
> URL: https://issues.apache.org/jira/browse/FLINK-4469
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row
> and output a single output row. In contrast, table-generating functions
> transform a single input row to multiple output rows. It is very useful in
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function.
> NOTE:
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of
> Java type erasure, we canβt extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method
> to call according to parameter types and number.
> {code}
> public class Word {
> public String word;
> public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
> public Iterable<Word> eval(String str) {
> if (str != null) {
> for (String s : str.split(",")) {
> collect(new Word(s, s.length()));
> }
> }
> }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to βwβ and βlβ
> table.crossApply("split(c) as (w, l)")
> .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
> .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
> .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
> .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1]
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)