This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 13cb456eb30dc50dbd47885b7471cd02cfe56cdf Author: luoyuxia <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Wed Aug 2 21:01:57 2023 +0800 [Flink-32356][doc] Add doc for procedure --- docs/content.zh/docs/dev/table/procedures.md | 541 ++++++++++++++++++++++++++ docs/content/docs/dev/table/procedures.md | 544 +++++++++++++++++++++++++++ 2 files changed, 1085 insertions(+) diff --git a/docs/content.zh/docs/dev/table/procedures.md b/docs/content.zh/docs/dev/table/procedures.md new file mode 100644 index 00000000000..cb70d759ebc --- /dev/null +++ b/docs/content.zh/docs/dev/table/procedures.md @@ -0,0 +1,541 @@ +--- +title: "存储过程" +is_beta: true +weight: 50 +type: docs +aliases: + - /zh/dev/table/procedures.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 存储过程 + + +Flink 允许用户在 Table API 和 SQL 中调用存储过程来完成一些特定任务,比如处理数据,数据管理类任务等。存储过程可以通过 `StreamExecutionEnvironment` 来运行 Flink 作业,这使得存储过程更加强大和灵活。 + +## 开发指南 +------------------ + +为了调用一个存储过程,需要确保一个 Catalog 可以提供这个存储过程。为了让一个 Catalog 提供存储过程,你首先需要实现一个存储过程,然后在方法 `Catalog.getProcedure(ObjectPath procedurePath)` 返回这个存储过程。 +下面的步骤将展示如何实现一个存储过程并让一个 Catalog 提供这个存储过程。 + +### 存储过程类 + +存储过程的实现类必须实现接口 `org.apache.flink.table.procedures.Procedure`。 + +该实现类必须声明为 `public`, 而不是 `abstract`, 并且可以被全局访问。不允许使用非静态内部类或匿名类。 + +### Call 方法 + +存储过程的接口不提供任何方法,存储过程的实现类必须有名为 `call` 的方法,在该方法里面可以实现存储过程实际的逻辑。`call` 方法必须被声明为 `public`, 并且带有一组定义明确的参数。 + +请注意: + +* `call` 方法的第一个参数总是应该为 `ProcedureContext`,该参数提供了方法 `getExecutionEnvironment()` 来得到当前的 `StreamExecutionEnvironment`。通过 `StreamExecutionEnvironment` 可以运行一个 Flink 作业; +* `call` 方法的返回类型应该永远都是一个数组类型,比如 `int[]`,`String[]`,等等; + +更多的细节请参考类 `org.apache.flink.table.procedures.Procedure` 的 Java 文档。 + +常规的 JVM 方法调用语义是适用的,因此可以: +- 实现重载的方法,例如 `call(ProcedureContext, Integer)` and `call(ProcedureContext, LocalDateTime)`; +- 使用变长参数,例如 `call(ProcedureContext, Integer...)`; +- 使用对象继承,例如 `call(ProcedureContext, Object)` 可接受 `LocalDateTime` 和 `Integer` 作为参数; +- 也可组合使用,例如 `call(ProcedureContext, Object...)` 可接受所有类型的参数; + +如果你希望用 Scala 来实现一个存储过程,对应可变长参数的情况,请添加 `scala.annotation.varargs`。另外,推荐使用装箱的基本类型(比如,使用 `java.lang.Integer` 而不是 `Int`)来支持 `NULL`。 + +下面的代码片段展示来一个重载存储过程的例子: + +{{< tabs "7c5a5392-30d7-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} + +```java +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +// 有多个重载 call 方法的存储过程 +public class GenerateSequenceProcedure implements Procedure { + + public long[] call(ProcedureContext context, int n) { + return generate(context.getExecutionEnvironment(), n); + } + + public long[] call(ProcedureContext context, String n) { + return generate(context.getExecutionEnvironment(), Integer.parseInt(n)); + } + + private long[] generate(StreamExecutionEnvironment env, int n) throws Exception { + long[] sequenceN = new long[n]; + int i = 0; + try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) { + while (result.hasNext()) { + sequenceN[i++] = result.next(); + } + } + return sequenceN; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import scala.annotation.varargs + +// 有多个重载 call 方法的存储过程 +class GenerateSequenceProcedure extends Procedure { + + def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = { + Array(a + b) + } + + def call(context: ProcedureContext, a: String, b: String): Array[Integer] = { + Array(Integer.valueOf(a) + Integer.valueOf(b)) + } + + @varargs // 类似 Java 的变长参数 + def call(context: ProcedureContext, d: Double*): Array[Integer] = { + Array(d.sum.toInt) + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + +### 类型推导 +Table(类似于 SQL 标准)是一种强类型的 API。 因此,存储过程的参数和返回类型都必须映射到 [data type]({{< ref "docs/dev/table/types" >}})。 + +从逻辑角度看,Planner 需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用存储过程时需要知道如何将内部数据结构表示为 JVM 对象。 + +术语 _类型推导_ 概括了意在验证输入值、推导出参数/返回值数据类型的逻辑。 + +Flink 存储过程实现了自动的类型推导提取,通过反射从存储过程的类及其 `call` 方法中推导数据类型。如果这种隐式的反射提取方法不成功,则可以通过使用 `@DataTypeHint` 和 `@ProcedureHint` 注解相关参数、类或方法来支持提取存储过程的参数和返回类型,下面展示了有关如何注解存储过程的例子。 + +需要注意的是虽然存储过程的 `call` 方法必须返回数组类型 `T[]`,但是如果用 `@DataTypeHint` 来注解返回类型,实际上注解的是该数组的元素的类型,即 `T`。 + +#### 自动类型推导 + +自动类型推导会检查存储过程的类和 `call` 方法,推导出存储过程参数和结果的数据类型, `@DataTypeHint` 和 `@ProcedurenHint` 注解支持自动类型推导。 + +有关可以隐式映射到数据类型的类的完整列表, 请参阅[data type extraction section]({{< ref "docs/dev/table/types" >}}#data-type-extraction)。 + +**`@DataTypeHint`** + +在许多情况下,需要支持以 _内联_ 方式自动提取出存储过程参数、返回值的类型。 + +以下例子展示了如何使用 `@DataTypeHint`,详情可参考该注解类的文档。 + +{{< tabs "81b297da-30d9-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.InputGroup; +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.types.Row; + +// 有多个重载 call 方法的存储过程 +public static class OverloadedProcedure implements Procedure { + + // 不需要 hint + public Long[] call(ProcedureContext context, long a, long b) { + return new Long[] {a + b}; + } + + // 定义 decimal 的精度和小数位 + public @DataTypeHint("DECIMAL(12, 3)") BigDecimal[] call(ProcedureContext context, double a, double b) { + return new BigDecimal[] {BigDecimal.valueOf(a + b)}; + } + + // 定义嵌套数据类型 + @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") + public Row[] call(ProcedureContext context, int i) { + return new Row[] {Row.of(String.valueOf(i), Instant.ofEpochSecond(i))}; + } + + // 允许任意类型的输入,并输出序列化定制后的值 + @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class) + public ByteBuffer[] call(ProcedureContext context, @DataTypeHint(inputGroup = InputGroup.ANY) Object o) { + return new ByteBuffer[] {MyUtils.serializeToByteBuffer(o)}; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.InputGroup +import org.apache.flink.table.functions.ScalarFunction +import org.apache.flink.types.Row +import scala.annotation.varargs + +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.InputGroup +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import org.apache.flink.types.Row + +// 有多个重载 call 方法的存储过程 +class OverloadedProcedure extends Procedure { + + // 不需要 hint + def call(context: ProcedureContext, a: Long, b: Long): Array[Long] = { + Array(a + b) + } + + // 定义 decimal 的精度和小数位 + @DataTypeHint("DECIMAL(12, 3)") + def call(context: ProcedureContext, a: Double, b: Double): Array[BigDecimal] = { + Array(BigDecimal.valueOf(a + b)) + } + + // 定义嵌套数据类型 + @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") + def call(context: ProcedureContext, i: Integer): Array[Row] = { + Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i)) + } + + // 允许任意类型的输入,并输出序列化定制后的值 + @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer]) + def call(context: ProcedureContext, @DataTypeHint(inputGroup = InputGroup.ANY) o: Object): Array[java.nio.ByteBuffer] = { + Array[MyUtils.serializeToByteBuffer(o)] + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + +**`@ProcedureHint`** + +有时我们希望一种 `call` 方法可以同时处理多种数据类型,有时又要求对重载的多个 `call` 方法仅声明一次通用的返回类型。 + +`@ProcedureHint` 注解可以提供从入参数据类型到返回数据类型的映射,它可以在整个存储过程类或 `call` 方法上注解输入和返回的数据类型。可以在类顶部声明一个或多个注解,也可以为类的所有 `call` 方法分别声明一个或多个注解。所有的 hint 参数都是可选的,如果未定义参数,则使用默认的基于反射的类型提取。在函数类顶部定义的 hint 参数被所有 `call` 方法继承。 + +以下例子展示了如何使用 `@ProcedureHint`,详情可参考该注解类的文档。 + +{{< tabs "5d205654-30da-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.types.Row; + +// 为存储过程类的所有 call 方法指定同一个返回类型 +@ProcedureHint(output = @DataTypeHint("ROW<s STRING, i INT>")) +public static class OverloadedProcedure implements Procedure { + + public Row[] call(ProcedureContext context, int a, int b) { + return new Row[] {Row.of("Sum", a + b)}; + } + + // 仍然可以重载 call 方法 + public Row[] call(ProcedureContext context) { + return new Row[] {Row.of("Empty args", -1)}; + } +} + +// 解耦类型推导与 call 方法,类型推导完全取决于 ProcedureHint +@ProcedureHint( + input = {@DataTypeHint("INT"), @DataTypeHint("INT")}, + output = @DataTypeHint("INT") +) +@ProcedureHint( + input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")}, + output = @DataTypeHint("BIGINT") +) +@ProcedureHint( + input = {}, + output = @DataTypeHint("BOOLEAN") +) +public static class OverloadedProcedure implements Procedure { + + // 一个 call 方法的实现,确保 call 方法存在于存储过程类中,可以被 JVM 调用 + public Object[] call(ProcedureContext context, Object... o) { + if (o.length == 0) { + return new Object[] {false}; + } + return new Object[] {o[0]}; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala + +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.ProcedureHint +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import org.apache.flink.types.Row +import scala.annotation.varargs + +// 为存储过程类的所有 call 方法指定同一个返回类型 +@ProcedureHint(output = new DataTypeHint("ROW<s STRING, i INT>")) +class OverloadedFunction extends Procedure { + + def call(context: ProcedureContext, a: Int, b: Int): Array[Row] = { + Array(Row.of("Sum", Int.box(a + b))) + } + + // 仍然可以重载 call 方法 + def call(context: ProcedureContext): Array[Row] = { + Array(Row.of("Empty args", Int.box(-1))) + } +} + +// 解耦类型推导与 call 方法,类型推导完全取决于 ProcedureHint +@ProcedureHint( + input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")), + output = new DataTypeHint("INT") +) +@ProcedureHint( + input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")), + output = new DataTypeHint("BIGINT") +) +@ProcedureHint( + input = Array(), + output = new DataTypeHint("BOOLEAN") +) +class OverloadedProcedure extends Procedure { + + // 一个 call 方法的实现,确保 call 方法存在于存储过程类中,可以被 JVM 调用 + @varargs + def call(context: ProcedureContext, o: AnyRef*): Array[AnyRef]= { + if (o.length == 0) { + Array(Boolean.box(false)) + } + Array(o(0)) + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + + +### 在 Catalog 中返回存储过程 +在实现了一个存储过程后,Catalog 可以通过方法 `Catalog.getProcedure(ObjectPath procedurePath)` 来返回该存储过程,下面的例子展示了如何在 Catalog 中返回存储过程。 +另外也可以在 `Catalog.listProcedures(String dbName)` 方法中列出所有的存储过程。 + +{{< tabs "2ee21ac4-30db-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +import java.util.HashMap; +import java.util.Map; + +// 有内置 procedure 的 Catalog +public class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog { + + static { + PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure()); + } + + public CatalogWithBuiltInProcedure(String name) { + super(name); + } + + @Override + public List<String> listProcedures(String dbName) throws DatabaseNotExistException, CatalogException { + return PROCEDURE_MAP.keySet().stream().filter(procedurePath -> procedurePath.getDatabaseName().equals(dbName)) + .map(ObjectPath::getObjectName).collect(Collectors.toList()); + } + + @Override + public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { + if (PROCEDURE_MAP.containsKey(procedurePath)) { + return PROCEDURE_MAP.get(procedurePath); + } else { + throw new ProcedureNotExistException(getName(), procedurePath); + } + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala + +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; +import org.apache.flink.table.procedures.Procedure; + +// 有内置 procedure 的 Catalog +class CatalogWithBuiltInProcedure(name: String) extends GenericInMemoryCatalog(name) { + + val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure]( + ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure()); + + @throws(classOf[DatabaseNotExistException]) + @throws(classOf[CatalogException]) + override def listProcedures(dbName: String): List[String] = { + if (!databaseExists(dbName)) { + throw new DatabaseNotExistException(getName, dbName); + } + PROCEDURE_MAP.keySet.filter(procedurePath => procedurePath.getDatabaseName.equals(dbName)) + .map(procedurePath => procedurePath.getObjectName).toList + } + + @throws(classOf[ProcedureNotExistException]) + override def getProcedure(procedurePath: ObjectPath): Procedure = { + if (PROCEDURE_MAP.contains(procedurePath)) { + PROCEDURE_MAP(procedurePath); + } else { + throw new ProcedureNotExistException(getName, procedurePath) + } + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + + +## 例子 + +下面的例子展示了如何在一个 Catalog 中提供一个存储过程并且通过 `CALL` 语句来调用这个存储过程。详情可参考[开发指南](#implementation-guide)。 + +{{< tabs "c3edd888-30db-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java + +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +// 首先实现一个存储过程 +public static class GenerateSequenceProcedure implements Procedure { + + public long[] call(ProcedureContext context, int n) { + long[] sequenceN = new long[n]; + int i = 0; + try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) { + while (result.hasNext()) { + sequenceN[i++] = result.next(); + } + } + return sequenceN; + } +} + +// 自定义一个 Catalog,并返回该存储过程 +public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog { + + static { + PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure()); + } + // 省略一些方法 + // ... + @Override + public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { + if (PROCEDURE_MAP.containsKey(procedurePath)) { + return PROCEDURE_MAP.get(procedurePath); + } else { + throw new ProcedureNotExistException(getName(), procedurePath); + } + } +} + +TableEnvironment tEnv = TableEnvironment.create(...); +// 注册这个 Catalog +tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure()); +// 通过 Call 语句调用该存储过程 +tEnv.executeSql("call my_catalog.`system`.generate_n(5)"); + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala + +import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath} +import org.apache.flink.table.catalog.exceptions.{CatalogException, ProcedureNotExistException} +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure + +// 首先实现一个存储过程 +class GenerateSequenceProcedure extends Procedure { + + def call(context: ProcedureContext, n: Integer): Array[Long] = { + val env = context.getExecutionEnvironment + val sequenceN = Array[Long] + var i = 0; + env.fromSequence(0, n - 1).executeAndCollect() + .forEachRemaining(r => { + sequenceN(i) = r + i = i + 1 + }) + sequenceN; + } +} + +// 然后在一个自定义的 catalog 返回该 procedure +class CatalogWithBuiltInProcedure(name: String) extends GenericInMemoryCatalog(name) { + + val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure](ObjectPath.fromString("system.generate_n"), + new GenerateSequenceProcedure()); + + // 省略一些方法 + // ... + + @throws(classOf[ProcedureNotExistException]) + override def getProcedure(procedurePath: ObjectPath): Procedure = { + if (PROCEDURE_MAP.contains(procedurePath)) { + PROCEDURE_MAP(procedurePath); + } else { + throw new ProcedureNotExistException(getName, procedurePath) + } + } +} + +TableEnvironment tEnv = TableEnvironment.create(...) +// 注册该 catalog +tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure()) +// 通过 Call 语句调用该存储过程 +tEnv.executeSql("call my_catalog.`system`.generate_n(5)") + +``` +{{< /tab >}} +{{< /tabs >}} + + diff --git a/docs/content/docs/dev/table/procedures.md b/docs/content/docs/dev/table/procedures.md new file mode 100644 index 00000000000..0122a0de191 --- /dev/null +++ b/docs/content/docs/dev/table/procedures.md @@ -0,0 +1,544 @@ +--- +title: "Procedures" +is_beta: true +weight: 50 +type: docs +aliases: + - /dev/table/procedures.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Procedures + +Flink Table API & SQL empowers users to perform data manipulation and administrative tasks with procedures. Procedures can run FLINK jobs with the provided `StreamExecutionEnvironment`, making them more powerful and flexible. + +## Implementation Guide + +To call a procedure, it must be available in a catalog. To provide procedures in a catalog, you need to implement the procedure and then return it using the `Catalog.getProcedure(ObjectPath procedurePath)` method. +The following steps will guild you on how to implement and provide a procedure in a catalog. + +### Procedure Class + +An implementation class must implement the interface `org.apache.flink.table.procedures.Procedure`. + +The class must be declared `public`, not `abstract`, and should be globally accessible. Thus, non-static inner or anonymous classes are not allowed. + +### Call Methods + +The interface doesn't provide any method,you have to define a method named `call` in which you can implement the logic of the procedure. +The methods must be declared `public` and take a well-defined set of arguments. + +Please note: + +* The first parameter of the method `call` should always be `ProcedureContext` which provides the method `getExecutionEnvironment` to get a `StreamExecutionEnvironment` for running a Flink Job +* The return type should always be an array, like `int[]`, `String[]`, etc + +More detail can be found in the Java doc of the class `org.apache.flink.table.procedures.Procedure`. + +Regular JVM method calling semantics apply. Therefore, it is possible to: +- implement overloaded methods such as `call(ProcedureContext, Integer)` and `call(ProcedureContext, LocalDateTime)` +- use var-args such as `call(ProcedureContext, Integer...)` +- use object inheritance such as `call(ProcedureContext, Object)` that takes both `LocalDateTime` and `Integer` +- and combinations of the above such as `call(ProcedureContext, Object...)` that takes all kinds of arguments + +If you intend to implement procedures in Scala, please add the `scala.annotation.varargs` annotation in +case of variable arguments. Furthermore, it is recommended to use boxed primitives (e.g. `java.lang.Integer` +instead of `Int`) to support `NULL`. + +The following snippets shows an example of an overloaded procedure: + +{{< tabs "0819d780-3052-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} + +```java +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +// procedure with overloaded call methods +public class GenerateSequenceProcedure implements Procedure { + + public long[] call(ProcedureContext context, int n) { + return generate(context.getExecutionEnvironment(), n); + } + + public long[] call(ProcedureContext context, String n) { + return generate(context.getExecutionEnvironment(), Integer.parseInt(n)); + } + + private long[] generate(StreamExecutionEnvironment env, int n) throws Exception { + long[] sequenceN = new long[n]; + int i = 0; + try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) { + while (result.hasNext()) { + sequenceN[i++] = result.next(); + } + } + return sequenceN; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import scala.annotation.varargs + +// procedures with overloaded call methods +class GenerateSequenceProcedure extends Procedure { + + def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] = { + Array(a + b) + } + + def call(context: ProcedureContext, a: String, b: String): Array[Integer] = { + Array(Integer.valueOf(a) + Integer.valueOf(b)) + } + + @varargs // generate var-args like Java + def call(context: ProcedureContext, d: Double*): Array[Integer] = { + Array(d.sum.toInt) + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + +### Type Inference +The table ecosystem (similar to the SQL standard) is a strongly typed API. Therefore, both procedure parameters and return types must be mapped to a [data type]({{< ref "docs/dev/table/types" >}}). + +From a logical perspective, the planner needs information about expected types, precision, and scale. From a JVM perspective, the planner needs information about how internal data structures are represented as JVM objects when calling a procedure. + +The logic for validating input arguments and deriving data types for both the parameters and the result of a procedure is summarized under the term _type inference_. + +Flink's procedures implement an automatic type inference extraction that derives data types from the procedure's class and its `call` methods via reflection. If this implicit reflective extraction approach is not successful, the extraction process can be supported by annotating affected parameters, classes, or methods with `@DataTypeHint` and `@ProcedureHint`. More examples on how to annotate procedures are shown below. + +Note: although the return type in `call` method must be array type `T[]`, if use `@DataTypeHint` to annotate the return type, it's actually expected to annotate the component type of the array type, which is actually `T`. + +#### Automatic Type Inference + +The automatic type inference inspects the procedure's class and `call` methods to derive data types for the arguments and result of a procedure. `@DataTypeHint` and `@ProcedureHint` annotations support the automatic extraction. + +For a full list of classes that can be implicitly mapped to a data type, please refer to the [data type extraction section]({{< ref "docs/dev/table/types" >}}#data-type-extraction). + +**`@DataTypeHint`** + +In many scenarios, it is required to support the automatic extraction _inline_ for parameters and return types of a procedure + +The following example shows how to use data type hints. More information can be found in the documentation of the annotation class. + +{{< tabs "9414057c-3051-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.InputGroup; +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.types.Row; + +// procedure with overloaded call methods +public static class OverloadedProcedure implements Procedure { + + // no hint required + public Long[] call(ProcedureContext context, long a, long b) { + return new Long[] {a + b}; + } + + // define the precision and scale of a decimal + public @DataTypeHint("DECIMAL(12, 3)") BigDecimal[] call(ProcedureContext context, double a, double b) { + return new BigDecimal[] {BigDecimal.valueOf(a + b)}; + } + + // define a nested data type + @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") + public Row[] call(ProcedureContext context, int i) { + return new Row[] {Row.of(String.valueOf(i), Instant.ofEpochSecond(i))}; + } + + // allow wildcard input and custom serialized output + @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class) + public ByteBuffer[] call(ProcedureContext context, @DataTypeHint(inputGroup = InputGroup.ANY) Object o) { + return new ByteBuffer[] {MyUtils.serializeToByteBuffer(o)}; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.InputGroup +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import org.apache.flink.types.Row + +// procedure with overloaded call methods +class OverloadedProcedure extends Procedure { + + // no hint required + def call(context: ProcedureContext, a: Long, b: Long): Array[Long] = { + Array(a + b) + } + + // define the precision and scale of a decimal + @DataTypeHint("DECIMAL(12, 3)") + def call(context: ProcedureContext, a: Double, b: Double): Array[BigDecimal] = { + Array(BigDecimal.valueOf(a + b)) + } + + // define a nested data type + @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>") + def call(context: ProcedureContext, i: Integer): Array[Row] = { + Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i)) + } + + // allow wildcard input and custom serialized output + @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer]) + def call(context: ProcedureContext, @DataTypeHint(inputGroup = InputGroup.ANY) o: Object): Array[java.nio.ByteBuffer] = { + Array[MyUtils.serializeToByteBuffer(o)] + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + +**`@ProcedureHint`** + +In some scenarios, it is desirable that one `call` method handles multiple different data types at the same time. Furthermore, in some scenarios, overloaded `call` methods have a common result type that should be declared only once. + +The `@ProcedureHint` annotation can provide a mapping from argument data types to a result data type. It enables annotating entire procedure classes or `call` methods for input and result data types. One or more annotations can be declared on top of a class or individually for each `call` method for overloading procedure signatures. All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined on top of a procedu [...] + +The following example shows how to use procedure hints. More information can be found in the documentation of the annotation class. + +{{< tabs "16d50628-3052-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; +import org.apache.flink.types.Row; + +// procedure with overloaded call methods +// but globally defined output type +@ProcedureHint(output = @DataTypeHint("ROW<s STRING, i INT>")) +public static class OverloadedProcedure implements Procedure { + + public Row[] call(ProcedureContext context, int a, int b) { + return new Row[] {Row.of("Sum", a + b)}; + } + + // overloading of arguments is still possible + public Row[] call(ProcedureContext context) { + return new Row[] {Row.of("Empty args", -1)}; + } +} + +// decouples the type inference from call methods, +// the type inference is entirely determined by the procedure hints +@ProcedureHint( + input = {@DataTypeHint("INT"), @DataTypeHint("INT")}, + output = @DataTypeHint("INT") +) +@ProcedureHint( + input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")}, + output = @DataTypeHint("BIGINT") +) +@ProcedureHint( + input = {}, + output = @DataTypeHint("BOOLEAN") +) +public static class OverloadedProcedure implements Procedure { + + // an implementer just needs to make sure that a method exists + // that can be called by the JVM + public Object[] call(ProcedureContext context, Object... o) { + if (o.length == 0) { + return new Object[] {false}; + } + return new Object[] {o[0]}; + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala + +import org.apache.flink.table.annotation.DataTypeHint +import org.apache.flink.table.annotation.ProcedureHint +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure +import org.apache.flink.types.Row +import scala.annotation.varargs + +// procedure with overloaded call methods +// but globally defined output type +@ProcedureHint(output = new DataTypeHint("ROW<s STRING, i INT>")) +class OverloadedFunction extends Procedure { + + def call(context: ProcedureContext, a: Int, b: Int): Array[Row] = { + Array(Row.of("Sum", Int.box(a + b))) + } + + // overloading of arguments is still possible + def call(context: ProcedureContext): Array[Row] = { + Array(Row.of("Empty args", Int.box(-1))) + } +} + +// decouples the type inference from call methods, +// the type inference is entirely determined by the function hints +@ProcedureHint( + input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")), + output = new DataTypeHint("INT") +) +@ProcedureHint( + input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")), + output = new DataTypeHint("BIGINT") +) +@ProcedureHint( + input = Array(), + output = new DataTypeHint("BOOLEAN") +) +class OverloadedProcedure extends Procedure { + + // an implementer just needs to make sure that a method exists + // that can be called by the JVM + @varargs + def call(context: ProcedureContext, o: AnyRef*): Array[AnyRef]= { + if (o.length == 0) { + Array(Boolean.box(false)) + } + Array(o(0)) + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + + +### Return Procedure in Catalog +After implementing a procedure, the catalog can then return the procedure in method `Catalog.getProcedure(ObjectPath procedurePath)`. The following example shows how to return it in a catalog. +Also, it's expected to list all the procedures in method `Catalog.listProcedures(String dbName)`. + +{{< tabs "2e7d4b78-3052-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java + +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +import java.util.HashMap; +import java.util.Map; + +// catalog with built-in procedures +public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog { + static { + PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure()); + } + + public CatalogWithBuiltInProcedure(String name) { + super(name); + } + + @Override + public List<String> listProcedures(String dbName) throws DatabaseNotExistException, CatalogException { + if (!databaseExists(dbName)) { + throw new DatabaseNotExistException(getName(), dbName); + } + return PROCEDURE_MAP.keySet().stream().filter(procedurePath -> procedurePath.getDatabaseName().equals(dbName)) + .map(ObjectPath::getObjectName).collect(Collectors.toList()); + } + + @Override + public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { + if (PROCEDURE_MAP.containsKey(procedurePath)) { + return PROCEDURE_MAP.get(procedurePath); + } else { + throw new ProcedureNotExistException(getName(), procedurePath); + } + } +} + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala + +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; +import org.apache.flink.table.procedures.Procedure; + +// catalog with built-in procedures +class CatalogWithBuiltInProcedure(name: String) extends GenericInMemoryCatalog(name) { + + val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure]( + ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure()); + + @throws(classOf[DatabaseNotExistException]) + @throws(classOf[CatalogException]) + override def listProcedures(dbName: String): List[String] = { + if (!databaseExists(dbName)) { + throw new DatabaseNotExistException(getName, dbName); + } + PROCEDURE_MAP.keySet.filter(procedurePath => procedurePath.getDatabaseName.equals(dbName)) + .map(procedurePath => procedurePath.getObjectName).toList + } + + @throws(classOf[ProcedureNotExistException]) + override def getProcedure(procedurePath: ObjectPath): Procedure = { + if (PROCEDURE_MAP.contains(procedurePath)) { + PROCEDURE_MAP(procedurePath); + } else { + throw new ProcedureNotExistException(getName, procedurePath) + } + } +} + +``` +{{< /tab >}} +{{< /tabs >}} + + +## Examples + +The following example shows how to provide a procedure in a `Catalog` and call it with `CALL` statement. See the [Implementation Guide](#implementation-guide) for more details. + +{{< tabs "1ed81b8a-3052-11ee-be56-0242ac120002" >}} +{{< tab "Java" >}} +```java + +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.table.procedures.Procedure; + +// first implement a procedure +public class GenerateSequenceProcedure implements Procedure { + + public long[] call(ProcedureContext context, int n) { + long[] sequenceN = new long[n]; + int i = 0; + try (CloseableIterator<Long> result = env.fromSequence(0, n - 1).executeAndCollect()) { + while (result.hasNext()) { + sequenceN[i++] = result.next(); + } + } + return sequenceN; + } +} + +// then provide the procedure in a custom catalog +public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog { + + static { + PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new GenerateSequenceProcedure()); + } + // emit some methods + // ... + @Override + public Procedure getProcedure(ObjectPath procedurePath) throws ProcedureNotExistException, CatalogException { + if (PROCEDURE_MAP.containsKey(procedurePath)) { + return PROCEDURE_MAP.get(procedurePath); + } else { + throw new ProcedureNotExistException(getName(), procedurePath); + } + } +} + +TableEnvironment tEnv = TableEnvironment.create(...); +// register the catalog +tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure()); +// call the procedure with CALL statement +tEnv.executeSql("call my_catalog.`system`.generate_n(5)"); + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala + +import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath} +import org.apache.flink.table.catalog.exceptions.{CatalogException, ProcedureNotExistException} +import org.apache.flink.table.procedure.ProcedureContext +import org.apache.flink.table.procedures.Procedure + +// first implement a procedure +class GenerateSequenceProcedure extends Procedure { + + def call(context: ProcedureContext, n: Integer): Array[Long] = { + val env = context.getExecutionEnvironment + val sequenceN = Array[Long] + var i = 0; + env.fromSequence(0, n - 1).executeAndCollect() + .forEachRemaining(r => { + sequenceN(i) = r + i = i + 1 + }) + sequenceN; + } +} + +// then provide the procedure in a custom catalog +class CatalogWithBuiltInProcedure(name: String) extends GenericInMemoryCatalog(name) { + + val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure](ObjectPath.fromString("system.generate_n"), + new GenerateSequenceProcedure()); + + // emit some methods + // ... + + @throws(classOf[ProcedureNotExistException]) + override def getProcedure(procedurePath: ObjectPath): Procedure = { + if (PROCEDURE_MAP.contains(procedurePath)) { + PROCEDURE_MAP(procedurePath); + } else { + throw new ProcedureNotExistException(getName, procedurePath) + } + } +} + +TableEnvironment tEnv = TableEnvironment.create(...) +// register the catalog +tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure()) +// call the procedure with CALL statement +tEnv.executeSql("call my_catalog.`system`.generate_n(5)") + +``` +{{< /tab >}} +{{< /tabs >}} + +