Repository: flink Updated Branches: refs/heads/master 270140a1d -> 5baea3f2e
[FLINK-5223] [doc] Add documentation of UDTF in Table API & SQL This closes #2956. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c86efbb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c86efbb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c86efbb Branch: refs/heads/master Commit: 5c86efbb449c631aea0b1b490cec706ad7596b44 Parents: da4af12 Author: Jark Wu <wuchong...@alibaba-inc.com> Authored: Wed Dec 7 21:18:58 2016 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Dec 13 14:13:17 2016 +0100 ---------------------------------------------------------------------- docs/dev/table_api.md | 123 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5c86efbb/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 2b42ab2..9271803 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -3897,6 +3897,129 @@ object TimestampModifier extends ScalarFunction { </div> </div> +### User-defined Table Functions + +A user-defined table function is implemented similar to a user-defined scalar function but can return a set of values instead of a single value. The returned set of values can consist of multiple columns and multiple rows similar to a standard table. A user-defined table function works on zero, one, or multiple scalar values as input and returns multiple rows as output. + +In order to define a table function one has to extend the base class `TableFunction` in `org.apache.flink.api.table.functions` and implement (one or more) evaluation methods. The behavior of a table function is determined by its evaluation methods. An evaluation method must be declared `public` and named `eval`. The `TableFunction` can be overloaded by implementing multiple methods named `eval`. The parameter types of the evaluation methods determine all valid parameters of the table function. The type of the returned table is determined by the generic type of `TableFunction`. Evaluation methods emit output rows using the protected `collect(T)` method. + +In the Table API, a table function is used with `.join(Expression)` or `.leftOuterJoin(Expression)` for Scala users and `.join(String)` or `.leftOuterJoin(String)` for Java users. The `join` operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The `leftOuterJoin` operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table. In SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with ON TRUE condition (see examples below). + +The following examples show how to define a table-valued function and use it: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +// the generic type "Tuple2<String, Integer>" determines the returned table type has two columns, +// the first is a String type and the second is an Integer type +public class Split extends TableFunction<Tuple2<String, Integer>> { + public void evel(String str) { + for (String s : str.split(" ")) { + // use collect(...) to emit an output row + collect(new Tuple2<String, Integer>(s, s.length())); + } + } +} + +BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); +Table myTable = ... // table schema: [a: String] + +// register the function +tableEnv.registerFunction("split", new Split()); + +// use the function in Java Table API +// use AS to rename column names +myTable.join("split(a) as (word, length)").select("a, word, length"); +myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length"); + +// use the function in SQL API, LATERAL and TABLE keywords are required +// CROSS JOIN a table function (equivalent to "join" in Table API) +tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); +// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API) +tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +// the generic type "(String, Integer)" determines the returned table type has two columns, +// the first is a String type and the second is an Integer type +class Split extends TableFunction[(String, Integer)] { + def evel(str: String): Unit = { + // use collect(...) to emit an output row + str.split(" ").foreach(x -> collect((x, x.length)) + } +} + +val tableEnv = TableEnvironment.getTableEnvironment(env) +val myTable = ... // table schema: [a: String] + +// use the function in Scala Table API (Note: No registration required in Scala Table API) +val split = new Split() +// use AS to rename column names +myTable.join(split('a) as ('word, 'length)).select('a, 'word, 'length); +myTable.leftOuterJoin(split('a) as ('word, 'length)).select('a, 'word, 'length); + +// register and use the function in SQL API, LATERAL and TABLE keywords are required +tableEnv.registerFunction("split", new Split()) +// CROSS JOIN a table function (equivalent to "join" in Table API) +tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); +// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API) +tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE"); +{% endhighlight %} +**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues. +</div> +</div> + +Please note that POJO types do not have a deterministic field order. Therefore, you cannot rename the fields of POJO returned by a table function using `AS`. + +By default the result type of a `TableFunction` is determined by Flinkâs automatic type extraction facilities. This works well for basic types and simple POJOs but might be wrong for more complex, custom, or composite types. In such a case, the type of the result can be manually specified by overriding `TableFunction#getResultType()` which returns its `TypeInformation`. + +The following example shows an example of a `TableFunction` that returns a `Row` type which requires explicit type information. We define that the returned table type should be `RowTypeInfo(String, Integer)` by overriding `TableFunction#getResultType()`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class CustomTypeSplit extends TableFunction<Row> { + public void eval(String str) { + for (String s : str.split(" ")) { + Row row = new Row(2); + row.setField(0, s); + row.setField(1, s.length); + collect(row); + } + } + + @Override + public TypeInformation<Row> getResultType() { + return new RowTypeInfo(new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class CustomTypeSplit extends TableFunction[Row] { + def eval(str: String): Unit = { + str.split(" ").foreach({ s => + val row = new Row(2) + row.setField(0, s) + row.setField(1, s.length) + collect(row) + }) + } + + override def getResultType: TypeInformation[Row] = { + new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO)) + } +} +{% endhighlight %} +</div> +</div> + ### Limitations The following operations are not supported yet: