This is an automated email from the ASF dual-hosted git repository. libenchao pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 1917af3 [FLINK-18324][docs-zh] Translate updated udf into Chinese 1917af3 is described below commit 1917af3a1eba8450c00a7be8c11796e3db16ddb8 Author: Yubin Li <lixin58...@163.com> AuthorDate: Tue Jun 30 19:16:34 2020 +0800 [FLINK-18324][docs-zh] Translate updated udf into Chinese This closes #12794 --- docs/dev/table/functions/udfs.zh.md | 1049 ++++++++++++++++++++++++----------- 1 file changed, 740 insertions(+), 309 deletions(-) diff --git a/docs/dev/table/functions/udfs.zh.md b/docs/dev/table/functions/udfs.zh.md index 995a67c..4e3a6be 100644 --- a/docs/dev/table/functions/udfs.zh.md +++ b/docs/dev/table/functions/udfs.zh.md @@ -22,322 +22,854 @@ specific language governing permissions and limitations under the License. --> -自定义函数是一个非常重要的功能,因为它极大的扩展了查询的表达能力。 +自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。 + +自定义函数可以用 JVM 语言(例如 Java 或 Scala)或 Python 实现,实现者可以在 UDF 中使用任意第三方库,本文聚焦于使用 JVM 语言开发自定义函数。 * This will be replaced by the TOC {:toc} -注册自定义函数 -------------------------------- -在大多数情况下,自定义函数在使用之前都需要注册。在 Scala Table API 中可以不用注册。 +概述 +-------- -通过调用 `registerFunction()` 把函数注册到 `TableEnvironment`。当一个函数注册之后,它就在 `TableEnvironment` 的函数 catalog 里面了,这样 Table API 或者 SQL 解析器就可以识别并使用它。 +当前 Flink 有如下几种函数: -关于如何注册和使用每种类型的自定义函数(标量函数、表值函数和聚合函数),更多示例可以看下面的部分。 +- *标量函数* 将标量值转换成一个新标量值; +- *表值函数* 将标量值转换成新的行数据; +- *聚合函数* 将多行数据里的标量值转换成一个新标量值; +- *表值聚合函数* 将多行数据里的标量值转换成新的行数据; +- *异步表值函数* 是异步查询外部数据系统的特殊函数。 -{% top %} +<span class="label label-danger">注意</span> 标量和表值函数已经使用了新的基于[数据类型]({% link dev/table/types.zh.md %})的类型系统,聚合函数仍然使用基于 `TypeInformation` 的旧类型系统。 -标量函数 ----------------- +以下示例展示了如何创建一个基本的标量函数,以及如何在 Table API 和 SQL 里调用这个函数。 -如果需要的标量函数没有被内置函数覆盖,就可以在自定义一个标量函数在 Table API 和 SQL 中使用。自定义标量函数可以把 0 到多个标量值映射成 1 个标量值。 +函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 _内联_ 后直接使用。 <div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法并须是 `public` 的,而且名字必须是 `eval`。求值方法的参数类型以及返回值类型就决定了标量函数的参数类型和返回值类型。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法也支持可变参数,例如 `eval(String... strs)`。 - -下面的示例展示了如何实现一个求哈希值的函数。先把它注册到 `TableEnvironment` 里,然后在查询的时候就可以直接使用了。需要注意的是,你可以在注册之前通过构造方法来配置你的标量函数: +<div data-lang="Java" markdown="1"> {% highlight java %} -public class HashCode extends ScalarFunction { - private int factor = 12; - - public HashCode(int factor) { - this.factor = factor; +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ScalarFunction; +import static org.apache.flink.table.api.Expressions.*; + +// 定义函数逻辑 +public static class SubstringFunction extends ScalarFunction { + public String eval(String s, Integer begin, Integer end) { + return s.substring(begin, end); } - - public int eval(String s) { - return s.hashCode() * factor; +} + +TableEnvironment env = TableEnvironment.create(...); + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12)); + +// 注册函数 +env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class); + +// 在 Table API 里调用注册好的函数 +env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12)); + +// 在 SQL 里调用注册好的函数 +env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable"); + +{% endhighlight %} +</div> + +<div data-lang="Scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.table.api._ +import org.apache.flink.table.functions.ScalarFunction + +// define function logic +class SubstringFunction extends ScalarFunction { + def eval(s: String, begin: Integer, end: Integer): String = { + s.substring(begin, end) } } -BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); +val env = TableEnvironment.create(...) + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12)) // 注册函数 -tableEnv.registerFunction("hashCode", new HashCode(10)); +env.createTemporarySystemFunction("SubstringFunction", classOf[SubstringFunction]) + +// 在 Table API 里调用注册好的函数 +env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12)) -// 在 Java Table API 中使用函数 -myTable.select("string, string.hashCode(), hashCode(string)"); +// 在 SQL 里调用注册好的函数 +env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable") -// 在 SQL API 中使用函数 -tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable"); {% endhighlight %} +</div> -求值方法的返回值类型默认是由 Flink 的类型推导来决定的。类型推导可以推导出基本数据类型以及简单的 POJO,但是对于更复杂的、自定义的、或者组合类型,可能会推导出错误的结果。在这种情况下,可以通过覆盖 `ScalarFunction#getResultType()`,并且返回 `TypeInformation` 来定义复杂类型。 +</div> -下面的示例展示了一个高级一点的自定义标量函数用法,它接收一个内部的时间戳参数,并且以 `long` 的形式返回该内部的时间戳。通过覆盖 `ScalarFunction#getResultType()`,我们定义了我们返回的 `long` 类型在代码生成时可以被解析为 `Types.TIMESTAMP` 类型。 +对于交互式会话,还可以在使用或注册函数之前对其进行参数化,这样可以把函数 _实例_ 而不是函数 _类_ 用作临时函数。 +为确保函数实例可应用于集群环境,参数必须是可序列化的。 + +<div class="codetabs" markdown="1"> + +<div data-lang="Java" markdown="1"> {% highlight java %} -public static class TimestampModifier extends ScalarFunction { - public long eval(long t) { - return t % 1000; +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ScalarFunction; +import static org.apache.flink.table.api.Expressions.*; + +// 定义可参数化的函数逻辑 +public static class SubstringFunction extends ScalarFunction { + + private boolean endInclusive; + + public SubstringFunction(boolean endInclusive) { + this.endInclusive = endInclusive; } - public TypeInformation<?> getResultType(Class<?>[] signature) { - return Types.SQL_TIMESTAMP; + public String eval(String s, Integer begin, Integer end) { + return s.substring(a, endInclusive ? end + 1 : end); } } -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法并须是 `public` 的,而且名字必须是 `eval`。求值方法的参数类型以及返回值类型就决定了标量函数的参数类型和返回值类型。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法也支持可变参数,例如 `@varargs def eval(str: String*)`。 +TableEnvironment env = TableEnvironment.create(...); + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5, 12)); + +// 注册函数 +env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true)); -下面的示例展示了如何实现一个求哈希值的函数。先把它注册到 `TableEnvironment` 里,然后在查询的时候就可以直接使用了。需要注意的是,你可以在注册之前通过构造方法来配置你的标量函数: +{% endhighlight %} +</div> +<div data-lang="Scala" markdown="1"> {% highlight scala %} -// 必须定义在 static/object 上下文中 -class HashCode(factor: Int) extends ScalarFunction { - def eval(s: String): Int = { - s.hashCode() * factor +import org.apache.flink.table.api._ +import org.apache.flink.table.functions.ScalarFunction + +// 定义可参数化的函数逻辑 +class SubstringFunction(val endInclusive) extends ScalarFunction { + def eval(s: String, begin: Integer, end: Integer): String = { + s.substring(endInclusive ? end + 1 : end) } } -val tableEnv = BatchTableEnvironment.create(env) +val env = TableEnvironment.create(...) -// 在 Scala Table API 中使用函数 -val hashCode = new HashCode(10) -myTable.select('string, hashCode('string)) +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(new SubstringFunction(true), $"myField", 5, 12)) + +// 注册函数 +env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true)) -// 在 SQL 中注册和使用函数 -tableEnv.registerFunction("hashCode", new HashCode(10)) -tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable") {% endhighlight %} +</div> -求值方法的返回值类型默认是由 Flink 的类型推导来决定的。类型推导可以推导出基本数据类型以及简单的 POJO,但是对于更复杂的、自定义的、或者组合类型,可能会推导出错误的结果。在这种情况下,可以通过覆盖 `ScalarFunction#getResultType()`,并且返回 `TypeInformation` 来定义复杂类型。 +</div> + +{% top %} -下面的示例展示了一个高级一点的自定义标量函数用法,它接收一个内部的时间戳参数,并且以 `long` 的形式返回该内部的时间戳。通过覆盖 `ScalarFunction#getResultType()`,我们定义了我们返回的 `long` 类型在代码生成时可以被解析为 `Types.TIMESTAMP` 类型。 +开发指南 +-------- +<span class="label label-danger">注意</span>在聚合函数使用新的类型系统前,本节仅适用于标量和表值函数。 + +所有的自定义函数都遵循一些基本的实现原则。 + +### 函数类 + +实现类必须继承自合适的基类之一(例如 `org.apache.flink.table.functions.ScalarFunction` )。 + +该类必须声明为 `public` ,而不是 `abstract` ,并且可以被全局访问。不允许使用非静态内部类或匿名类。 + +为了将自定义函数存储在持久化的 catalog 中,该类必须具有默认构造器,且在运行时可实例化。 + +### 求值方法 + +基类提供了一组可以被重写的方法,例如 `open()`、 `close()` 或 `isDeterministic()` 。 + +但是,除了上述方法之外,作用于每条传入记录的主要逻辑还必须通过专门的 _求值方法_ 来实现。 + +根据函数的种类,后台生成的运算符会在运行时调用诸如 `eval()`、`accumulate()` 或 `retract()` 之类的求值方法。 + +这些方法必须声明为 `public` ,并带有一组定义明确的参数。 + +常规的 JVM 方法调用语义是适用的。因此可以: +- 实现重载的方法,例如 `eval(Integer)` 和 `eval(LocalDateTime)`; +- 使用变长参数,例如 `eval(Integer...)`; +- 使用对象继承,例如 `eval(Object)` 可接受 `LocalDateTime` 和 `Integer` 作为参数; +- 也可组合使用,例如 `eval(Object...)` 可接受所有类型的参数。 + +以下代码片段展示了一个重载函数的示例: + +<div class="codetabs" markdown="1"> + +<div data-lang="Java" markdown="1"> +{% highlight java %} +import org.apache.flink.table.functions.ScalarFunction; + +// 有多个重载求值方法的函数 +public static class SumFunction extends ScalarFunction { + + public Integer eval(Integer a, Integer b) { + return a + b; + } + + public Integer eval(String a, String b) { + return Integer.valueOf(a) + Integer.valueOf(); + } + + public Integer eval(Double... d) { + double result = 0; + for (double value : d) + result += value; + return (int) result; + } +} + +{% endhighlight %} +</div> + +<div data-lang="Scala" markdown="1"> {% highlight scala %} -object TimestampModifier extends ScalarFunction { - def eval(t: Long): Long = { - t % 1000 +import org.apache.flink.table.functions.ScalarFunction +import scala.annotation.varargs + +// 有多个重载求值方法的函数 +class SumFunction extends ScalarFunction { + + def eval(a: Integer, b: Integer): Integer = { + a + b } - override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = { - Types.TIMESTAMP + def eval(a: String, b: String): Integer = { + Integer.valueOf(a) + Integer.valueOf(b) + } + + @varargs // generate var-args like Java + def eval(d: Double*): Integer = { + d.sum.toInt } } + {% endhighlight %} </div> -<div data-lang="python" markdown="1"> -要定义一个 Python 标量函数,你可以继承 `pyflink.table.udf` 下的 `ScalarFunction`,并且实现一个求值函数。Python 标量函数的行为取决于你实现的求值函数,它的名字必须是 `eval`。 +</div> -下面的示例展示了如何自定义一个 Python 的求哈希值的函数,并且把它注册到 `TableEnvironment` 里,然后在查询中使用它。你可以在注册函数之前通过构造函数来配置你的标量函数。 +### 类型推导 -{% highlight python %} -class HashCode(ScalarFunction): - def __init__(self): - self.factor = 12 +Table(类似于 SQL 标准)是一种强类型的 API。因此,函数的参数和返回类型都必须映射到[数据类型]({%link dev/table/types.zh.md %})。 + +从逻辑角度看,Planner 需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用自定义函数时需要知道如何将内部数据结构表示为 JVM 对象。 + +术语 _类型推导_ 概括了意在验证输入值、派生出参数/返回值数据类型的逻辑。 + +Flink 自定义函数实现了自动的类型推导提取,通过反射从函数的类及其求值方法中派生数据类型。如果这种隐式的反射提取方法不成功,则可以通过使用 `@DataTypeHint` 和 `@FunctionHint` 注解相关参数、类或方法来支持提取过程,下面展示了有关如何注解函数的例子。 - def eval(self, s): - return hash(s) * self.factor +如果需要更高级的类型推导逻辑,实现者可以在每个自定义函数中显式重写 `getTypeInference()` 方法。但是,建议使用注解方式,因为它可使自定义类型推导逻辑保持在受影响位置附近,而在其他位置则保持默认状态。 -table_env = BatchTableEnvironment.create(env) -# 注册 Python 函数 -table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT())) +#### 自动类型推导 -# 在 Python Table API 中使用函数 -my_table.select("string, bigint, string.hash_code(), hash_code(string)") +自动类型推导会检查函数的类和求值方法,派生出函数参数和结果的数据类型, `@DataTypeHint` 和 `@FunctionHint` 注解支持自动类型推导。 + +有关可以隐式映射到数据类型的类的完整列表,请参阅[数据类型]({%link dev/table/types.zh.md %}#数据类型注解)。 + +**`@DataTypeHint`** + +在许多情况下,需要支持以 _内联_ 方式自动提取出函数参数、返回值的类型。 + +以下例子展示了如何使用 `@DataTypeHint`,详情可参考该注解类的文档。 + +<div class="codetabs" markdown="1"> + +<div data-lang="Java" markdown="1"> +{% highlight java %} +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.InputGroup; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.types.Row; + +// 有多个重载求值方法的函数 +public static class OverloadedFunction extends ScalarFunction { + + // no hint required + public Long eval(long a, long b) { + return a + b; + } + + // 定义 decimal 的精度和小数位 + public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) { + return BigDecimal.valueOf(a + b); + } + + // 定义嵌套数据类型 + @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>") + public Row eval(int i) { + return Row.of(String.valueOf(i), Instant.ofEpochSecond(i)); + } + + // 允许任意类型的符入,并输出序列化定制后的值 + @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class) + public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) { + return MyUtils.serializeToByteBuffer(o); + } +} -# 在 SQL API 中使用函数 -table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable") {% endhighlight %} +</div> + +<div data-lang="Scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.InputGroup +import org.apache.flink.table.functions.ScalarFunction +import org.apache.flink.types.Row +import scala.annotation.varargs + +// function with overloaded evaluation methods +class OverloadedFunction extends ScalarFunction { + + // no hint required + def eval(a: Long, b: Long): Long = { + a + b + } + + // 定义 decimal 的精度和小数位 + @DataTypeHint("DECIMAL(12, 3)") + def eval(double a, double b): BigDecimal = { + java.lang.BigDecimal.valueOf(a + b) + } + + // 定义嵌套数据类型 + @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>") + def eval(Int i): Row = { + Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i)) + } + + // 允许任意类型的符入,并输出定制序列化后的值 + @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer]) + def eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o): java.nio.ByteBuffer = { + MyUtils.serializeToByteBuffer(o) + } +} -除了继承 `ScalarFunction`,还有很多方法可以定义 Python 标量函数。 -更多细节,可以参考 [Python 标量函数]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html#scalar-functions) 文档。 +{% endhighlight %} </div> + </div> -{% top %} +**`@FunctionHint`** -表值函数 ---------------- +有时我们希望一种求值方法可以同时处理多种数据类型,有时又要求对重载的多个求值方法仅声明一次通用的结果类型。 + +`@FunctionHint` 注解可以提供从入参数据类型到结果数据类型的映射,它可以在整个函数类或求值方法上注解输入、累加器和结果的数据类型。可以在类顶部声明一个或多个注解,也可以为类的所有求值方法分别声明一个或多个注解。所有的 hint 参数都是可选的,如果未定义参数,则使用默认的基于反射的类型提取。在函数类顶部定义的 hint 参数被所有求值方法继承。 -跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列。 +以下例子展示了如何使用 `@FunctionHint`,详情可参考该注解类的文档。 <div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -要定义一个表值函数,你需要扩展 `org.apache.flink.table.functions` 下的 `TableFunction`,并且实现(一个或者多个)求值方法。表值函数的行为取决于你实现的求值方法。求值方法必须被声明为 `public`,并且名字必须是 `eval`。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法的参数类型决定了表值函数的参数类型。表值函数也可以支持变长参数,比如 `eval(String... strs)`。表值函数返回的表的类型取决于 `TableFunction` 的泛型参数。求值方法通过 `collect(T)` 方法来发送要输出的行。 +<div data-lang="Java" markdown="1"> +{% highlight java %} +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; + +// 为函数类的所有求值方法指定同一个输出类型 +@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>")) +public static class OverloadedFunction extends TableFunction<Row> { -在 Table API 中,表值函数是通过 `.joinLateral` 或者 `.leftOuterJoinLateral` 来使用的。`joinLateral` 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。`leftOuterJoinLateral` 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。在 SQL 里面用 CORSS JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 `LATERAL TABLE(<TableFunction>)` 的使用。 + public void eval(int a, int b) { + collect(Row.of("Sum", a + b)); + } -下面的例子展示了如何定义一个表值函数,如何在 TableEnvironment 中注册表值函数,以及如何在查询中使用表值函数。你可以在注册之前通过构造函数来配置你的表值函数: + // overloading of arguments is still possible + public void eval() { + collect(Row.of("Empty args", -1)); + } +} -{% highlight java %} -// 泛型参数的类型 "Tuple2<String, Integer>" 决定了返回的表的 schema 是(String,Integer)。 -public class Split extends TableFunction<Tuple2<String, Integer>> { - private String separator = " "; - - public Split(String separator) { - this.separator = separator; - } - - public void eval(String str) { - for (String s : str.split(separator)) { - // 使用 collect(...) 来输出一行数据 - collect(new Tuple2<String, Integer>(s, s.length())); - } +// 解耦类型推导与求值方法,类型推导完全取决于 FunctionHint +@FunctionHint( + input = [@DataTypeHint("INT"), @DataTypeHint("INT")], + output = @DataTypeHint("INT") +) +@FunctionHint( + input = [@DataTypeHint("LONG"), @DataTypeHint("LONG")], + output = @DataTypeHint("LONG") +) +@FunctionHint( + input = [], + output = @DataTypeHint("BOOLEAN") +) +public static class OverloadedFunction extends TableFunction<Object> { + + // an implementer just needs to make sure that a method exists + // that can be called by the JVM + public void eval(Object... o) { + if (o.length == 0) { + collect(false); } + collect(o[0]); + } } -BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); -Table myTable = ... // table schema: [a: String] +{% endhighlight %} +</div> + +<div data-lang="Scala" markdown="1"> +{% highlight scala %} + +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.FunctionHint +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.types.Row + +// 为函数类的所有求值方法指定同一个输出类型 +@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>")) +class OverloadedFunction extends TableFunction[Row] { -// 注册表值函数。 -tableEnv.registerFunction("split", new Split("#")); + def eval(a: Int, b: Int): Unit = { + collect(Row.of("Sum", Int.box(a + b))) + } -// 在 Java Table API 中使用表值函数。"as" 指明了表的字段名字 -myTable.joinLateral("split(a) as (word, length)") - .select("a, word, length"); -myTable.leftOuterJoinLateral("split(a) as (word, length)") - .select("a, word, length"); + // overloading of arguments is still possible + def eval(): Unit = { + collect(Row.of("Empty args", Int.box(-1))) + } +} + +// 解耦类型推导与求值方法,类型推导完全取决于 @FunctionHint +@FunctionHint( + input = Array(@DataTypeHint("INT"), @DataTypeHint("INT")), + output = @DataTypeHint("INT") +) +@FunctionHint( + input = Array(@DataTypeHint("LONG"), @DataTypeHint("LONG")), + output = @DataTypeHint("LONG") +) +@FunctionHint( + input = Array(), + output = @DataTypeHint("BOOLEAN") +) +class OverloadedFunction extends TableFunction[AnyRef] { + + // an implementer just needs to make sure that a method exists + // that can be called by the JVM + @varargs + def eval(o: AnyRef*) = { + if (o.length == 0) { + collect(Boolean.box(false)) + } + collect(o(0)) + } +} -// 在 SQL 中用 LATERAL 和 TABLE 关键字来使用表值函数 -// CROSS JOIN a table function (等价于 Table API 中的 "join"). -tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); -// LEFT JOIN a table function (等价于 in Table API 中的 "leftOuterJoin"). -tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE"); {% endhighlight %} +</div> + +</div> + +#### 定制类型推导 -需要注意的是 POJO 类型没有确定的字段顺序。所以,你不可以用 `AS` 来重命名返回的 POJO 的字段。 +在大多数情况下,`@DataTypeHint` 和 `@FunctionHint` 足以构建自定义函数,然而通过重写 `getTypeInference()` 定制自动类型推导逻辑,实现者可以创建任意像系统内置函数那样有用的函数。 -`TableFunction` 的返回类型默认是用 Flink 自动类型推导来决定的。对于基础类型和简单的 POJO 类型推导是没有问题的,但是对于更复杂的、自定义的、以及组合的类型可能会推导错误。如果有这种情况,可以通过重写(override) `TableFunction#getResultType()` 并且返回 `TypeInformation` 来指定返回类型。 +以下用 Java 实现的例子展示了定制类型推导的潜力,它根据字符串参数来确定函数的结果类型。该函数带有两个字符串参数:第一个参数表示要分析的字符串,第二个参数表示目标类型。 -下面的例子展示了 `TableFunction` 返回了一个 `Row` 类型,需要显示指定返回类型。我们通过重写 `TableFunction#getResultType` 来指定 `RowTypeInfo(String, Integer)` 作为返回的表的类型。 +<div class="codetabs" markdown="1"> +<div data-lang="Java" markdown="1"> {% highlight java %} -public class CustomTypeSplit extends TableFunction<Row> { - public void eval(String str) { - for (String s : str.split(" ")) { - Row row = new Row(2); - row.setField(0, s); - row.setField(1, s.length()); - collect(row); - } +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.types.inference.TypeInference; +import org.apache.flink.types.Row; + +public static class LiteralFunction extends ScalarFunction { + public Object eval(String s, String type) { + switch (type) { + case "INT": + return Integer.valueOf(s); + case "DOUBLE": + return Double.valueOf(s); + case "STRING": + default: + return s; } + } + + // 禁用自动的反射式类型推导,使用如下逻辑进行类型推导 + @Override + public TypeInference getTypeInference(DataTypeFactory typeFactory) { + return TypeInference.newBuilder() + // 指定输入参数的类型,必要时参数会被隐式转换 + .typedArguments(DataTypes.STRING(), DataTypes.STRING()) + // specify a strategy for the result data type of the function + .outputTypeStrategy(callContext -> { + if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) { + throw callContext.newValidationError("Literal expected for second argument."); + } + // 基于字符串值返回数据类型 + final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING"); + switch (literal) { + case "INT": + return Optional.of(DataTypes.INT().notNull()); + case "DOUBLE": + return Optional.of(DataTypes.DOUBLE().notNull()); + case "STRING": + default: + return Optional.of(DataTypes.STRING()); + } + }) + .build(); + } +} + +{% endhighlight %} +</div> + +</div> + +### 运行时集成 +------------------- + +有时候自定义函数需要获取一些全局信息,或者在真正被调用之前做一些配置(setup)/清理(clean-up)的工作。自定义函数也提供了 `open()` 和 `close()` 方法,你可以重写这两个方法做到类似于 DataStream API 中 `RichFunction` 的功能。 + +open() 方法在求值方法被调用之前先调用。close() 方法在求值方法调用完之后被调用。 + +open() 方法提供了一个 FunctionContext,它包含了一些自定义函数被执行时的上下文信息,比如 metric group、分布式文件缓存,或者是全局的作业参数等。 + +下面的信息可以通过调用 `FunctionContext` 的对应的方法来获得: + +| 方法 | 描述 | +| :------------------------------------ | :----------------------------------------------------- | +| `getMetricGroup()` | 执行该函数的 subtask 的 Metric Group。 | +| `getCachedFile(name)` | 分布式文件缓存的本地临时文件副本。| +| `getJobParameter(name, defaultValue)` | 跟对应的 key 关联的全局参数值。 | + +下面的例子展示了如何在一个标量函数中通过 FunctionContext 来获取一个全局的任务参数: + +<div class="codetabs" markdown="1"> + +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.ScalarFunction; + +public static class HashCodeFunction extends ScalarFunction { + + private int factor = 0; @Override - public TypeInformation<Row> getResultType() { - return Types.ROW(Types.STRING(), Types.INT()); + public void open(FunctionContext context) throws Exception { + // 获取参数 "hashcode_factor" + // 如果不存在,则使用默认值 "12" + factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12")); + } + + public int eval(String s) { + return s.hashCode() * factor; } } -{% endhighlight %} +TableEnvironment env = TableEnvironment.create(...); + +// 设置任务参数 +env.getConfig().addJobParameter("hashcode_factor", "31"); + +// 注册函数 +env.createTemporarySystemFunction("hashCode", HashCodeFunction.class); + +// 调用函数 +env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable"); + +{% endhighlight %} </div> <div data-lang="scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.table.api._ +import org.apache.flink.table.functions.FunctionContext +import org.apache.flink.table.functions.ScalarFunction -要定义一个表值函数,你需要扩展 `org.apache.flink.table.functions` 下的 `TableFunction`,并且实现(一个或者多个)求值方法。表值函数的行为取决于你实现的求值方法。求值方法必须被声明为 `public`,并且名字必须是 `eval`。可以通过实现多个名为 `eval` 的方法对求值方法进行重载。求值方法的参数类型决定了表值函数的参数类型。表值函数也可以支持变长参数,比如 `eval(String... strs)`。表值函数返回的表的类型取决于 `TableFunction` 的泛型参数。求值方法通过 `collect(T)` 方法来发送要输出的行。 +class HashCodeFunction extends ScalarFunction { -在 Table API 中,表值函数是通过 `.joinLateral` 或者 `.leftOuterJoinLateral` 来使用的。`joinLateral` 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。`leftOuterJoinLateral` 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。在 SQL 里面用 CORSS JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 `LATERAL TABLE(<TableFunction>)` 的使用。 + private var factor: Int = 0 -下面的例子展示了如何定义一个表值函数,如何在 TableEnvironment 中注册表值函数,以及如何在查询中使用表值函数。你可以在注册之前通过构造函数来配置你的表值函数: + override def open(context: FunctionContext): Unit = { + // 获取参数 "hashcode_factor" + // 如果不存在,则使用默认值 "12" + factor = context.getJobParameter("hashcode_factor", "12").toInt + } -{% highlight scala %} -// 泛型参数的类型 "(String, Int)" 决定了返回的表的 schema 是 (String, Integer)。 -class Split(separator: String) extends TableFunction[(String, Int)] { - def eval(str: String): Unit = { - // 使用 collect(...) 来输出一行 - str.split(separator).foreach(x => collect((x, x.length))) + def eval(s: String): Int = { + s.hashCode * factor } } -val tableEnv = BatchTableEnvironment.create(env) -val myTable = ... // table schema: [a: String] +val env = TableEnvironment.create(...) -// 在 Scala Table API 中使用表值函数(注意:在 Scala Table API 中不需要注册函数) -val split = new Split("#") -// "as" 指明了返回表的字段名字 -myTable.joinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length) -myTable.leftOuterJoinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length) +// 设置任务参数 +env.getConfig.addJobParameter("hashcode_factor", "31") + +// 注册函数 +env.createTemporarySystemFunction("hashCode", classOf[HashCodeFunction]) -// 注册表值函数,然后才能在 SQL 查询中使用 -tableEnv.registerFunction("split", new Split("#")) +// 调用函数 +env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable") -// 在 SQL 中使用 LATERAL 和 TABLE 关键字类使用表值函数 -// CROSS JOIN a table function (equivalent to "join" in Table API) -tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)") -// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API) -tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE") {% endhighlight %} -**重要:**不要把表值函数实现成一个 Scala object。Scala object 是一个单例,会有并发的问题。 +</div> -需要注意的是 POJO 类型没有确定的字段顺序。所以,你不可以用 `AS` 来重命名返回的 POJO 的字段。 +</div> -`TableFunction` 的返回类型默认是用 Flink 自动类型推导来决定的。对于基础类型和简单的 POJO 类型推导是没有问题的,但是对于更复杂的、自定义的、以及组合的类型可能会推导错误。如果有这种情况,可以通过重写(override) `TableFunction#getResultType()` 并且返回 `TypeInformation` 来指定返回类型。 +{% top %} -下面的例子展示了 `TableFunction` 返回了一个 `Row` 类型,需要显示指定返回类型。我们通过重写 `TableFunction#getResultType` 来返回 `RowTypeInfo` 作为返回类型。 +标量函数 +---------------- -{% highlight scala %} -class CustomTypeSplit extends TableFunction[Row] { - def eval(str: String): Unit = { - str.split(" ").foreach({ s => - val row = new Row(2) - row.setField(0, s) - row.setField(1, s.length) - collect(row) - }) +自定义标量函数可以把 0 到多个标量值映射成 1 个标量值,[数据类型]({%link dev/table/types.zh.md %})里列出的任何数据类型都可作为求值方法的参数和返回值类型。 + +想要实现自定义标量函数,你需要扩展 `org.apache.flink.table.functions` 里面的 `ScalarFunction` 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法必须是 `public` 的,而且名字必须是 `eval`。 + +下面的例子展示了如何实现一个求哈希值的函数并在查询里调用它,详情可参考[开发指南](#开发指南): + +<div class="codetabs" markdown="1"> + +<div data-lang="Java" markdown="1"> +{% highlight java %} +import org.apache.flink.table.annotation.InputGroup; +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ScalarFunction; +import static org.apache.flink.table.api.Expressions.*; + +public static class HashFunction extends ScalarFunction { + + // 接受任意类型输入,返回 INT 型输出 + public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) { + return o.hashCode(); } +} + +TableEnvironment env = TableEnvironment.create(...); + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(HashFunction.class, $("myField"))); + +// 注册函数 +env.createTemporarySystemFunction("HashFunction", HashFunction.class); + +// 在 Table API 里调用注册好的函数 +env.from("MyTable").select(call("HashFunction", $("myField"))); + +// 在 SQL 里调用注册好的函数 +env.sqlQuery("SELECT HashFunction(myField) FROM MyTable"); + +{% endhighlight %} +</div> - override def getResultType: TypeInformation[Row] = { - Types.ROW(Types.STRING, Types.INT) +<div data-lang="Scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.table.annotation.InputGroup +import org.apache.flink.table.api._ +import org.apache.flink.table.functions.ScalarFunction + +class HashFunction extends ScalarFunction { + + // 接受任意类型输入,返回 INT 型输出 + def eval(@DataTypeHint(inputGroup = InputGroup.ANY) o: AnyRef): Int { + return o.hashCode(); } } + +val env = TableEnvironment.create(...) + +// 在 Table API 里不经注册直接“内联”调用函数 +env.from("MyTable").select(call(classOf[HashFunction], $"myField")) + +// 注册函数 +env.createTemporarySystemFunction("HashFunction", classOf[HashFunction]) + +// 在 Table API 里调用注册好的函数 +env.from("MyTable").select(call("HashFunction", $"myField")) + +// 在 SQL 里调用注册好的函数 +env.sqlQuery("SELECT HashFunction(myField) FROM MyTable") + {% endhighlight %} </div> -<div data-lang="python" markdown="1"> -要实现一个 Python 表值函数,你可以扩展 `pyflink.table.udtf` 下的 `TableFunction`,并且实现一个求值方法。Python 表值函数的行为取决于你实现的求值方法,它的名字必须是 `eval`。 +</div> -在 Python Table API 中,表值函数是通过 `.join_lateral` 或者 `.left_outer_join_lateral` 来使用的。`join_lateral` 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。`left_outer_join_lateral` 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。在 SQL 里面用 CORSS JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 `LATERAL TABLE(<TableFunction>)` 的使用。 +如果你打算使用 Python 实现或调用标量函数,详情可参考 [Python 标量函数]({% link dev/table/python/python_udfs.zh.md %}#scalar-functions)。 -下面的例子展示了如何定义一个 Python 表值函数,如何在 TableEnvironment 中注册表值函数,以及如何在查询中使用表值函数。你可以在注册之前通过构造函数来配置你的表值函数: +{% top %} -{% highlight python %} -class Split(TableFunction): - def eval(self, string): - for s in string.split(" "): - yield s, len(s) +表值函数 +--------------- -env = StreamExecutionEnvironment.get_execution_environment() -table_env = StreamTableEnvironment.create(env) -my_table = ... # type: Table, table schema: [a: String] +跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。 -# 注册 Python 表值函数 -table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()])) +要定义一个表值函数,你需要扩展 `org.apache.flink.table.functions` 下的 `TableFunction`,可以通过实现多个名为 `eval` 的方法对求值方法进行重载。像其他函数一样,输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 `TableFunction` 类的泛型参数 `T`,不同于标量函数,表值函数的求值方法本身不包含返回类型,而是通过 `collect(T)` 方法来发送要输出的行。 -# 在 Python Table API 中使用 Python 表值函数 -my_table.join_lateral("split(a) as (word, length)") -my_table.left_outer_join_lateral("split(a) as (word, length)") +在 Table API 中,表值函数是通过 `.joinLateral(...)` 或者 `.leftOuterJoinLateral(...)` 来使用的。`joinLateral` 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。`leftOuterJoinLateral` 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。 -# 在 SQL API 中使用 Python 表值函数 -table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)") -table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE") +在 SQL 里面用 `JOIN` 或者 以 `ON TRUE` 为条件的 `LEFT JOIN` 来配合 `LATERAL TABLE(<TableFunction>)` 的使用。 + +下面的例子展示了如何实现一个分隔函数并在查询里调用它,详情可参考[开发指南](#开发指南): + +<div class="codetabs" markdown="1"> + +<div data-lang="Java" markdown="1"> +{% highlight java %} +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.types.Row; +import static org.apache.flink.table.api.Expressions.*; + +@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) +public static class SplitFunction extends TableFunction<Row> { + + public void eval(String str) { + for (String s : str.split(" ")) { + // use collect(...) to emit a row + collect(Row.of(s, s.length())); + } + } +} + +TableEnvironment env = TableEnvironment.create(...); + +// 在 Table API 里不经注册直接“内联”调用函数 +env + .from("MyTable") + .joinLateral(call(SplitFunction.class, $("myField"))) + .select($("myField"), $("word"), $("length")); +env + .from("MyTable") + .leftOuterJoinLateral(call(SplitFunction.class, $("myField"))) + .select($("myField"), $("word"), $("length")); + +// 在 Table API 里重命名函数字段 +env + .from("MyTable") + .leftOuterJoinLateral(call(SplitFunction.class, $("myField")).as("newWord", "newLength")) + .select($("myField"), $("newWord"), $("newLength")); + +// 注册函数 +env.createTemporarySystemFunction("SplitFunction", SplitFunction.class); + +// 在 Table API 里调用注册好的函数 +env + .from("MyTable") + .joinLateral(call("SplitFunction", $("myField"))) + .select($("myField"), $("word"), $("length")); +env + .from("MyTable") + .leftOuterJoinLateral(call("SplitFunction", $("myField"))) + .select($("myField"), $("word"), $("length")); + +// 在 SQL 里调用注册好的函数 +env.sqlQuery( + "SELECT myField, word, length " + + "FROM MyTable, LATERAL TABLE(SplitFunction(myField))"); +env.sqlQuery( + "SELECT myField, word, length " + + "FROM MyTable " + + "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE"); + +// 在 SQL 里重命名函数字段 +env.sqlQuery( + "SELECT myField, newWord, newLength " + + "FROM MyTable " + + "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE"); {% endhighlight %} +</div> + +<div data-lang="Scala" markdown="1"> +{% highlight scala %} +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.FunctionHint +import org.apache.flink.table.api._ +import org.apache.flink.table.functions.TableFunction +import org.apache.flink.types.Row + +@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) +class SplitFunction extends TableFunction[Row] { + + def eval(str: String): Unit = { + // use collect(...) to emit a row + str.split(" ").foreach(s => collect(Row.of(s, s.length))) + } +} + +val env = TableEnvironment.create(...) -除了继承 `TableFunction`,还有很多其它方法可以定义 Python 表值函数。 -更多信息,参考 [Python 表值函数]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html#table-functions)文档。 +// 在 Table API 里不经注册直接“内联”调用函数 +env + .from("MyTable") + .joinLateral(call(classOf[SplitFunction], $"myField") + .select($"myField", $"word", $"length") +env + .from("MyTable") + .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField")) + .select($"myField", $"word", $"length") +// 在 Table API 里重命名函数字段 +env + .from("MyTable") + .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField").as("newWord", "newLength")) + .select($"myField", $"newWord", $"newLength") + +// 注册函数 +env.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction]) + +// 在 Table API 里调用注册好的函数 +env + .from("MyTable") + .joinLateral(call("SplitFunction", $"myField")) + .select($"myField", $"word", $"length") +env + .from("MyTable") + .leftOuterJoinLateral(call("SplitFunction", $"myField")) + .select($"myField", $"word", $"length") + +// 在 SQL 里调用注册好的函数 +env.sqlQuery( + "SELECT myField, word, length " + + "FROM MyTable, LATERAL TABLE(SplitFunction(myField))"); +env.sqlQuery( + "SELECT myField, word, length " + + "FROM MyTable " + + "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE") + +// 在 SQL 里重命名函数字段 +env.sqlQuery( + "SELECT myField, newWord, newLength " + + "FROM MyTable " + + "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE") + +{% endhighlight %} </div> + </div> -{% top %} +如果你打算使用 Scala,不要把表值函数声明为 Scala `object`,Scala `object` 是单例对象,将导致并发问题。 +如果你打算使用 Python 实现或调用表值函数,详情可参考 [Python 表值函数]({% link dev/table/python/python_udfs.zh.md %}#table-functions)。 + +{% top %} 聚合函数 --------------------- @@ -355,11 +887,11 @@ table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE **下面几个方法是每个 `AggregateFunction` 必须要实现的:** - `createAccumulator()` -- `accumulate()` +- `accumulate()` - `getValue()` Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果,比如那些非基本类型和普通的 POJO 类型的复杂类型。所以跟 `ScalarFunction` 和 `TableFunction` 一样,`AggregateFunction` 也提供了 `AggregateFunction#getResultType()` 和 `AggregateFunction#getAccumulatorType()` 来分别指定返回值类型和 accumulator 的类型,两个函数的返回值类型也都是 `TypeInformation`。 - + 除了上面的方法,还有几个方法可以选择实现。这些方法有些可以让查询更加高效,而有些是在某些特定场景下必须要实现的。例如,如果聚合函数用在会话窗口(当两个会话窗口合并的时候需要 merge 他们的 accumulator)的话,`merge()` 方法就是必须要实现的。 **`AggregateFunction` 的以下方法在某些场景下是必须实现的:** @@ -409,7 +941,7 @@ public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFu } /** - * Base class for aggregation functions. + * Base class for aggregation functions. * * @param <T> the type of the aggregation result * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the @@ -518,7 +1050,7 @@ abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction } /** - * Base class for aggregation functions. + * Base class for aggregation functions. * * @tparam T the type of the aggregation result * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the @@ -561,7 +1093,7 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedAggregateFunction[T, * merged. */ def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL - + /** * Called every time when an aggregation result should be materialized. * The returned value could be either an early and incomplete result @@ -643,7 +1175,7 @@ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum acc.sum -= iValue * iWeight; acc.count -= iWeight; } - + public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) { Iterator<WeightedAvgAccum> iter = it.iterator(); while (iter.hasNext()) { @@ -652,7 +1184,7 @@ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum acc.sum += a.sum; } } - + public void resetAccumulator(WeightedAvgAccum acc) { acc.count = 0; acc.sum = 0L; @@ -693,7 +1225,7 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] { override def createAccumulator(): WeightedAvgAccum = { new WeightedAvgAccum } - + override def getValue(acc: WeightedAvgAccum): JLong = { if (acc.count == 0) { null @@ -701,7 +1233,7 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] { acc.sum / acc.count } } - + def accumulate(acc: WeightedAvgAccum, iValue: JLong, iWeight: JInteger): Unit = { acc.sum += iValue * iWeight acc.count += iWeight @@ -711,7 +1243,7 @@ class WeightedAvg extends AggregateFunction[JLong, CountAccumulator] { acc.sum -= iValue * iWeight acc.count -= iWeight } - + def merge(acc: WeightedAvgAccum, it: java.lang.Iterable[WeightedAvgAccum]): Unit = { val iter = it.iterator() while (iter.hasNext) { @@ -787,7 +1319,7 @@ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum acc.sum -= iValue * iWeight; acc.count -= iWeight; } - + public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) { Iterator<WeightedAvgAccum> iter = it.iterator(); while (iter.hasNext()) { @@ -796,7 +1328,7 @@ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum acc.sum += a.sum; } } - + public void resetAccumulator(WeightedAvgAccum acc) { acc.count = 0; acc.sum = 0L; @@ -815,7 +1347,6 @@ t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores G </div> </div> - {% top %} 表值聚合函数 @@ -834,10 +1365,10 @@ t_env.sql_query("SELECT user, wAvg(points, level) AS avgPoints FROM userScores G **下面几个 `TableAggregateFunction` 的方法是必须要实现的:** - `createAccumulator()` -- `accumulate()` +- `accumulate()` Flink 的类型推导在遇到复杂类型的时候可能会推导出错误的结果,比如那些非基本类型和普通的 POJO 类型的复杂类型。所以类似于 `ScalarFunction` 和 `TableFunction`,`TableAggregateFunction` 也提供了 `TableAggregateFunction#getResultType()` 和 `TableAggregateFunction#getAccumulatorType()` 方法来指定返回值类型和 accumulator 的类型,这两个方法都需要返回 `TypeInformation`。 - + 除了上面的方法,还有几个其他的方法可以选择性的实现。有些方法可以让查询更加高效,而有些方法对于某些特定场景是必须要实现的。比如,在会话窗口(当两个会话窗口合并时会合并两个 accumulator)中使用聚合函数时,必须要实现`merge()` 方法。 **下面几个 `TableAggregateFunction` 的方法在某些特定场景下是必须要实现的:** @@ -895,7 +1426,7 @@ public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFu } /** - * Base class for table aggregation functions. + * Base class for table aggregation functions. * * @param <T> the type of the aggregation result * @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the @@ -948,7 +1479,7 @@ public abstract class TableAggregateFunction<T, ACC> extends UserDefinedAggregat * @param out the collector used to output data */ public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL - + /** * Called every time when an aggregation result should be materialized. The returned value * could be either an early and incomplete result (periodically emitted as data arrive) or @@ -968,7 +1499,7 @@ public abstract class TableAggregateFunction<T, ACC> extends UserDefinedAggregat * records. */ public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL - + /** * Collects a record and forwards it. The collector can output retract messages with the retract * method. Note: only use it in {@code emitRetractValueIncrementally}. @@ -1022,7 +1553,7 @@ abstract class UserDefinedAggregateFunction[T, ACC] extends UserDefinedFunction } /** - * Base class for table aggregation functions. + * Base class for table aggregation functions. * * @tparam T the type of the aggregation result * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the @@ -1065,7 +1596,7 @@ abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFuncti * merged. */ def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL - + /** * Called every time when an aggregation result should be materialized. The returned value * could be either an early and incomplete result (periodically emitted as data arrive) or @@ -1096,13 +1627,13 @@ abstract class TableAggregateFunction[T, ACC] extends UserDefinedAggregateFuncti * records. */ def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit // OPTIONAL - + /** * Collects a record and forwards it. The collector can output retract messages with the retract * method. Note: only use it in `emitRetractValueIncrementally`. */ trait RetractableCollector[T] extends Collector[T] { - + /** * Retract a record. * @@ -1412,104 +1943,4 @@ tab </div> </div> - -{% top %} - -实现自定义函数的最佳实践 ------------------------------------- - -在 Table API 和 SQL 的内部,代码生成会尽量的使用基础类型。自定义函数的参数及返回值类型是对象,会有很多的对象创建、转换(cast)、以及自动拆装箱的开销。因此,强烈建议使用基础类型来作为参数以及返回值的类型。`Types.DATE` 和 `Types.TIME` 可以用 `int` 来表示。`Types.TIMESTAMP` 可以用 `long` 来表示。 - -我们建议自定义函数用 Java 来实现,而不是用 Scala 来实现,因为 Flink 的类型推导对 Scala 不是很友好。 - -{% top %} - -自定义函数跟运行时集成 ---------------------------------- - -有时候自定义函数需要获取一些全局信息,或者在真正被调用之前做一些配置(setup)/清理(clean-up)的工作。自定义函数也提供了 `open()` 和 `close()` 方法,你可以重写这两个方法做到类似于 DataSet 或者 DataStream API 中 `RichFunction` 的功能。 - -`open()` 方法在求值方法被调用之前先调用。`close()` 方法在求值方法调用完之后被调用。 - -`open()` 方法提供了一个 `FunctionContext`,它包含了一些自定义函数被执行时的上下文信息,比如 metric group、分布式文件缓存,或者是全局的作业参数等。 - -下面的信息可以通过调用 `FunctionContext` 的对应的方法来获得: - -| 方法 | 描述 | -| :------------------------------------ | :----------------------------------------------------- | -| `getMetricGroup()` | 执行该函数的 subtask 的 Metric Group。 | -| `getCachedFile(name)` | 分布式文件缓存的本地临时文件副本。 | -| `getJobParameter(name, defaultValue)` | 跟对应的 key 关联的全局参数值。 | - -下面的例子展示了如何在一个标量函数中通过 `FunctionContext` 来获取一个全局的任务参数: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -public class HashCode extends ScalarFunction { - - private int factor = 0; - - @Override - public void open(FunctionContext context) throws Exception { - // 获取参数 "hashcode_factor" - // 如果不存在,则使用默认值 "12" - factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); - } - - public int eval(String s) { - return s.hashCode() * factor; - } -} - -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); - -// 设置任务参数 -Configuration conf = new Configuration(); -conf.setString("hashcode_factor", "31"); -env.getConfig().setGlobalJobParameters(conf); - -// 注册函数 -tableEnv.registerFunction("hashCode", new HashCode()); - -// 在 Java Table API 中使用函数 -myTable.select("string, string.hashCode(), hashCode(string)"); - -// 在 SQL 中使用函数 -tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable"); -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -object hashCode extends ScalarFunction { - - var hashcode_factor = 12 - - override def open(context: FunctionContext): Unit = { - // 获取参数 "hashcode_factor" - // 如果不存在,则使用默认值 "12" - hashcode_factor = context.getJobParameter("hashcode_factor", "12").toInt - } - - def eval(s: String): Int = { - s.hashCode() * hashcode_factor - } -} - -val tableEnv = BatchTableEnvironment.create(env) - -// 在 Scala Table API 中使用函数 -myTable.select('string, hashCode('string)) - -// 在 SQL 中注册和使用函数 -tableEnv.registerFunction("hashCode", hashCode) -tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable") -{% endhighlight %} - -</div> -</div> - {% top %} -