Repository: flink
Updated Branches:
  refs/heads/master 270140a1d -> 5baea3f2e


[FLINK-5223] [doc] Add documentation of UDTF in Table API & SQL

This closes #2956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c86efbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c86efbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c86efbb

Branch: refs/heads/master
Commit: 5c86efbb449c631aea0b1b490cec706ad7596b44
Parents: da4af12
Author: Jark Wu <wuchong...@alibaba-inc.com>
Authored: Wed Dec 7 21:18:58 2016 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Dec 13 14:13:17 2016 +0100

----------------------------------------------------------------------
 docs/dev/table_api.md | 123 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c86efbb/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 2b42ab2..9271803 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -3897,6 +3897,129 @@ object TimestampModifier extends ScalarFunction {
 </div>
 </div>
 
+### User-defined Table Functions
+
+A user-defined table function is implemented similar to a user-defined scalar 
function but can return a set of values instead of a single value. The returned 
set of values can consist of multiple columns and multiple rows similar to a 
standard table. A user-defined table function works on zero, one, or multiple 
scalar values as input and returns multiple rows as output.
+
+In order to define a table function one has to extend the base class 
`TableFunction` in `org.apache.flink.api.table.functions` and implement (one or 
more) evaluation methods. The behavior of a table function is determined by its 
evaluation methods. An evaluation method must be declared `public` and named 
`eval`. The `TableFunction` can be overloaded by implementing multiple methods 
named `eval`. The parameter types of the evaluation methods determine all valid 
parameters of the table function. The type of the returned table is determined 
by the generic type of `TableFunction`. Evaluation methods emit output rows 
using the protected `collect(T)` method.
+
+In the Table API, a table function is used with `.join(Expression)` or 
`.leftOuterJoin(Expression)` for Scala users and `.join(String)` or 
`.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each 
row from the outer table (table on the left of the operator) with all rows 
produced by the table-valued function (which is on the right side of the 
operator). The `leftOuterJoin` operator joins each row from the outer table 
(table on the left of the operator) with all rows produced by the table-valued 
function (which is on the right side of the operator) and preserves outer rows 
for which the table function returns an empty table. In SQL use `LATERAL 
TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with ON TRUE condition 
(see examples below).
+
+The following examples show how to define a table-valued function and use it:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// the generic type "Tuple2<String, Integer>" determines the returned table 
type has two columns,
+// the first is a String type and the second is an Integer type
+public class Split extends TableFunction<Tuple2<String, Integer>> {
+    public void evel(String str) {
+        for (String s : str.split(" ")) {
+            // use collect(...) to emit an output row
+            collect(new Tuple2<String, Integer>(s, s.length()));
+        }
+    }
+}
+
+BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+Table myTable = ...         // table schema: [a: String]
+
+// register the function
+tableEnv.registerFunction("split", new Split());
+
+// use the function in Java Table API
+// use AS to rename column names
+myTable.join("split(a) as (word, length)").select("a, word, length");
+myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
+
+// use the function in SQL API, LATERAL and TABLE keywords are required
+// CROSS JOIN a table function (equivalent to "join" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as 
T(word, length)");
+// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL 
TABLE(split(a)) as T(word, length) ON TRUE");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// the generic type "(String, Integer)" determines the returned table type has 
two columns,
+// the first is a String type and the second is an Integer type
+class Split extends TableFunction[(String, Integer)] {
+  def evel(str: String): Unit = {
+    // use collect(...) to emit an output row
+    str.split(" ").foreach(x -> collect((x, x.length))
+  }
+}
+
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+val myTable = ...         // table schema: [a: String]
+
+// use the function in Scala Table API (Note: No registration required in 
Scala Table API)
+val split = new Split()
+// use AS to rename column names
+myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length);
+myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 
'length);
+
+// register and use the function in SQL API, LATERAL and TABLE keywords are 
required
+tableEnv.registerFunction("split", new Split())
+// CROSS JOIN a table function (equivalent to "join" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as 
T(word, length)");
+// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
+tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as 
T(word, length) ON TRUE");
+{% endhighlight %}
+**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object 
is a singleton and will cause concurrency issues.
+</div>
+</div>
+
+Please note that POJO types do not have a deterministic field order. 
Therefore, you cannot rename the fields of POJO returned by a table function 
using `AS`.
+
+By default the result type of a `TableFunction` is determined by Flink’s 
automatic type extraction facilities. This works well for basic types and 
simple POJOs but might be wrong for more complex, custom, or composite types. 
In such a case, the type of the result can be manually specified by overriding 
`TableFunction#getResultType()` which returns its `TypeInformation`.
+
+The following example shows an example of a `TableFunction` that returns a 
`Row` type which requires explicit type information. We define that the 
returned table type should be `RowTypeInfo(String, Integer)` by overriding 
`TableFunction#getResultType()`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class CustomTypeSplit extends TableFunction<Row> {
+    public void eval(String str) {
+        for (String s : str.split(" ")) {
+            Row row = new Row(2);
+            row.setField(0, s);
+            row.setField(1, s.length);
+            collect(row);
+        }
+    }
+
+    @Override
+    public TypeInformation<Row> getResultType() {
+        return new RowTypeInfo(new TypeInformation[]{
+                                       BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO});
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CustomTypeSplit extends TableFunction[Row] {
+  def eval(str: String): Unit = {
+    str.split(" ").foreach({ s =>
+      val row = new Row(2)
+      row.setField(0, s)
+      row.setField(1, s.length)
+      collect(row)
+    })
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO,
+                        BasicTypeInfo.INT_TYPE_INFO))
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
 ### Limitations
 
 The following operations are not supported yet:

Reply via email to