This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 13cb456eb30dc50dbd47885b7471cd02cfe56cdf
Author: luoyuxia <luoyu...@alumni.sjtu.edu.cn>
AuthorDate: Wed Aug 2 21:01:57 2023 +0800

    [Flink-32356][doc] Add doc for procedure
---
 docs/content.zh/docs/dev/table/procedures.md | 541 ++++++++++++++++++++++++++
 docs/content/docs/dev/table/procedures.md    | 544 +++++++++++++++++++++++++++
 2 files changed, 1085 insertions(+)

diff --git a/docs/content.zh/docs/dev/table/procedures.md 
b/docs/content.zh/docs/dev/table/procedures.md
new file mode 100644
index 00000000000..cb70d759ebc
--- /dev/null
+++ b/docs/content.zh/docs/dev/table/procedures.md
@@ -0,0 +1,541 @@
+---
+title: "存储过程"
+is_beta: true
+weight: 50
+type: docs
+aliases:
+  - /zh/dev/table/procedures.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 存储过程
+
+
+Flink 允许用户在 Table API 和 SQL 中调用存储过程来完成一些特定任务,比如处理数据,数据管理类任务等。存储过程可以通过 
`StreamExecutionEnvironment` 来运行 Flink 作业,这使得存储过程更加强大和灵活。
+
+## 开发指南
+------------------
+
+为了调用一个存储过程,需要确保一个 Catalog 可以提供这个存储过程。为了让一个 Catalog 提供存储过程,你首先需要实现一个存储过程,然后在方法 
`Catalog.getProcedure(ObjectPath procedurePath)` 返回这个存储过程。
+下面的步骤将展示如何实现一个存储过程并让一个 Catalog 提供这个存储过程。
+
+### 存储过程类
+
+存储过程的实现类必须实现接口 `org.apache.flink.table.procedures.Procedure`。
+
+该实现类必须声明为 `public`, 而不是 `abstract`, 并且可以被全局访问。不允许使用非静态内部类或匿名类。
+
+### Call 方法
+
+存储过程的接口不提供任何方法,存储过程的实现类必须有名为 `call` 的方法,在该方法里面可以实现存储过程实际的逻辑。`call` 方法必须被声明为 
`public`, 并且带有一组定义明确的参数。
+
+请注意:
+
+* `call` 方法的第一个参数总是应该为 `ProcedureContext`,该参数提供了方法 `getExecutionEnvironment()` 
来得到当前的 `StreamExecutionEnvironment`。通过 `StreamExecutionEnvironment` 可以运行一个 
Flink 作业;
+* `call` 方法的返回类型应该永远都是一个数组类型,比如 `int[]`,`String[]`,等等;
+
+更多的细节请参考类 `org.apache.flink.table.procedures.Procedure` 的 Java 文档。
+
+常规的 JVM 方法调用语义是适用的,因此可以:
+- 实现重载的方法,例如 `call(ProcedureContext, Integer)` and `call(ProcedureContext, 
LocalDateTime)`;
+- 使用变长参数,例如 `call(ProcedureContext, Integer...)`;
+- 使用对象继承,例如 `call(ProcedureContext, Object)` 可接受 `LocalDateTime` 和 `Integer` 
作为参数;
+- 也可组合使用,例如 `call(ProcedureContext, Object...)` 可接受所有类型的参数;
+
+如果你希望用 Scala 来实现一个存储过程,对应可变长参数的情况,请添加 
`scala.annotation.varargs`。另外,推荐使用装箱的基本类型(比如,使用 `java.lang.Integer` 而不是 
`Int`)来支持 `NULL`。
+
+下面的代码片段展示来一个重载存储过程的例子:
+
+{{< tabs "7c5a5392-30d7-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+
+// 有多个重载 call 方法的存储过程
+public class GenerateSequenceProcedure implements Procedure {
+
+  public long[] call(ProcedureContext context, int n) {
+    return generate(context.getExecutionEnvironment(), n);
+  }
+
+  public long[] call(ProcedureContext context, String n) {
+    return generate(context.getExecutionEnvironment(), Integer.parseInt(n));
+  }
+
+  private long[] generate(StreamExecutionEnvironment env, int n) throws 
Exception {
+    long[] sequenceN = new long[n];
+    int i = 0;
+    try (CloseableIterator<Long> result = env.fromSequence(0, n - 
1).executeAndCollect()) {
+      while (result.hasNext()) {
+        sequenceN[i++] = result.next();
+      }
+    }
+    return sequenceN;
+  }
+}
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure
+import scala.annotation.varargs
+
+// 有多个重载 call 方法的存储过程
+class GenerateSequenceProcedure extends Procedure {
+
+  def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] 
= {
+    Array(a + b)
+  }
+
+  def call(context: ProcedureContext, a: String, b: String): Array[Integer] = {
+    Array(Integer.valueOf(a) + Integer.valueOf(b))
+  }
+
+  @varargs // 类似 Java 的变长参数
+  def call(context: ProcedureContext, d: Double*): Array[Integer] = {
+    Array(d.sum.toInt)
+  }
+}
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### 类型推导
+Table(类似于 SQL 标准)是一种强类型的 API。 因此,存储过程的参数和返回类型都必须映射到 [data type]({{< ref 
"docs/dev/table/types" >}})。
+
+从逻辑角度看,Planner 需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用存储过程时需要知道如何将内部数据结构表示为 
JVM 对象。
+
+术语 _类型推导_ 概括了意在验证输入值、推导出参数/返回值数据类型的逻辑。
+
+Flink 存储过程实现了自动的类型推导提取,通过反射从存储过程的类及其 `call` 方法中推导数据类型。如果这种隐式的反射提取方法不成功,则可以通过使用 
`@DataTypeHint` 和 `@ProcedureHint` 
注解相关参数、类或方法来支持提取存储过程的参数和返回类型,下面展示了有关如何注解存储过程的例子。
+
+需要注意的是虽然存储过程的 `call` 方法必须返回数组类型 `T[]`,但是如果用 `@DataTypeHint` 
来注解返回类型,实际上注解的是该数组的元素的类型,即 `T`。
+
+#### 自动类型推导 
+
+自动类型推导会检查存储过程的类和 `call` 方法,推导出存储过程参数和结果的数据类型, `@DataTypeHint` 和 
`@ProcedurenHint` 注解支持自动类型推导。
+
+有关可以隐式映射到数据类型的类的完整列表, 请参阅[data type extraction section]({{< ref 
"docs/dev/table/types" >}}#data-type-extraction)。
+
+**`@DataTypeHint`**
+
+在许多情况下,需要支持以 _内联_ 方式自动提取出存储过程参数、返回值的类型。
+
+以下例子展示了如何使用 `@DataTypeHint`,详情可参考该注解类的文档。
+
+{{< tabs "81b297da-30d9-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.types.Row;
+
+// 有多个重载 call 方法的存储过程
+public static class OverloadedProcedure implements Procedure {
+
+  // 不需要 hint
+  public Long[] call(ProcedureContext context, long a, long b) {
+    return new Long[] {a + b};
+  }
+
+  // 定义 decimal 的精度和小数位
+  public @DataTypeHint("DECIMAL(12, 3)") BigDecimal[] call(ProcedureContext 
context, double a, double b) {
+    return new BigDecimal[] {BigDecimal.valueOf(a + b)};
+  }
+
+  // 定义嵌套数据类型
+  @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
+  public Row[] call(ProcedureContext context, int i) {
+    return new Row[] {Row.of(String.valueOf(i), Instant.ofEpochSecond(i))};
+  }
+
+  // 允许任意类型的输入,并输出序列化定制后的值
+  @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
+  public ByteBuffer[] call(ProcedureContext context, @DataTypeHint(inputGroup 
= InputGroup.ANY) Object o) {
+    return new ByteBuffer[] {MyUtils.serializeToByteBuffer(o)};
+  }
+}
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.types.Row
+import scala.annotation.varargs
+
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure
+import org.apache.flink.types.Row
+
+// 有多个重载 call 方法的存储过程
+class OverloadedProcedure extends Procedure {
+
+  // 不需要 hint
+  def call(context: ProcedureContext, a: Long, b: Long): Array[Long] = {
+    Array(a + b)
+  }
+
+  // 定义 decimal 的精度和小数位
+  @DataTypeHint("DECIMAL(12, 3)")
+  def call(context: ProcedureContext, a: Double, b: Double): Array[BigDecimal] 
= {
+    Array(BigDecimal.valueOf(a + b))
+  }
+
+  // 定义嵌套数据类型
+  @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
+  def call(context: ProcedureContext, i: Integer): Array[Row] = {
+    Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
+  }
+
+  // 允许任意类型的输入,并输出序列化定制后的值
+  @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
+  def call(context: ProcedureContext, @DataTypeHint(inputGroup = 
InputGroup.ANY) o: Object): Array[java.nio.ByteBuffer] = {
+    Array[MyUtils.serializeToByteBuffer(o)]
+  }
+}
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**`@ProcedureHint`**
+
+有时我们希望一种 `call` 方法可以同时处理多种数据类型,有时又要求对重载的多个 `call` 方法仅声明一次通用的返回类型。
+
+`@ProcedureHint` 注解可以提供从入参数据类型到返回数据类型的映射,它可以在整个存储过程类或 `call` 
方法上注解输入和返回的数据类型。可以在类顶部声明一个或多个注解,也可以为类的所有 `call` 方法分别声明一个或多个注解。所有的 hint 
参数都是可选的,如果未定义参数,则使用默认的基于反射的类型提取。在函数类顶部定义的 hint 参数被所有 `call` 方法继承。
+
+以下例子展示了如何使用 `@ProcedureHint`,详情可参考该注解类的文档。
+
+{{< tabs "5d205654-30da-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.types.Row;
+
+// 为存储过程类的所有 call 方法指定同一个返回类型
+@ProcedureHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+public static class OverloadedProcedure implements Procedure {
+
+  public Row[] call(ProcedureContext context, int a, int b) {
+    return new Row[] {Row.of("Sum", a + b)};
+  }
+
+  // 仍然可以重载 call 方法
+  public Row[] call(ProcedureContext context) {
+    return new Row[] {Row.of("Empty args", -1)};
+  }
+}
+
+// 解耦类型推导与 call 方法,类型推导完全取决于 ProcedureHint
+@ProcedureHint(
+  input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
+  output = @DataTypeHint("INT")
+)
+@ProcedureHint(
+  input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
+  output = @DataTypeHint("BIGINT")
+)
+@ProcedureHint(
+  input = {},
+  output = @DataTypeHint("BOOLEAN")
+)
+public static class OverloadedProcedure implements Procedure {
+
+  // 一个 call 方法的实现,确保 call 方法存在于存储过程类中,可以被 JVM 调用
+  public Object[] call(ProcedureContext context, Object... o) {
+    if (o.length == 0) {
+      return new Object[] {false};
+    }
+    return new Object[] {o[0]};
+  }
+}
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.ProcedureHint
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure
+import org.apache.flink.types.Row
+import scala.annotation.varargs
+
+// 为存储过程类的所有 call 方法指定同一个返回类型
+@ProcedureHint(output = new DataTypeHint("ROW<s STRING, i INT>"))
+class OverloadedFunction extends Procedure {
+
+  def call(context: ProcedureContext, a: Int, b: Int): Array[Row] = {
+    Array(Row.of("Sum", Int.box(a + b)))
+  }
+
+  // 仍然可以重载 call 方法
+  def call(context: ProcedureContext): Array[Row] = {
+    Array(Row.of("Empty args", Int.box(-1)))
+  }
+}
+
+// 解耦类型推导与 call 方法,类型推导完全取决于 ProcedureHint
+@ProcedureHint(
+  input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")),
+  output = new DataTypeHint("INT")
+)
+@ProcedureHint(
+  input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")),
+  output = new DataTypeHint("BIGINT")
+)
+@ProcedureHint(
+  input = Array(),
+  output = new DataTypeHint("BOOLEAN")
+)
+class OverloadedProcedure extends Procedure {
+
+  // 一个 call 方法的实现,确保 call 方法存在于存储过程类中,可以被 JVM 调用
+  @varargs
+  def call(context: ProcedureContext, o: AnyRef*): Array[AnyRef]= {
+    if (o.length == 0) {
+      Array(Boolean.box(false))
+    }
+    Array(o(0))
+  }
+}
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+
+### 在 Catalog 中返回存储过程
+在实现了一个存储过程后,Catalog 可以通过方法 `Catalog.getProcedure(ObjectPath procedurePath)` 
来返回该存储过程,下面的例子展示了如何在 Catalog 中返回存储过程。
+另外也可以在 `Catalog.listProcedures(String dbName)` 方法中列出所有的存储过程。
+
+{{< tabs "2ee21ac4-30db-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+
+import java.util.HashMap;
+import java.util.Map;
+
+// 有内置 procedure 的 Catalog
+public class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog {
+
+  static {
+    PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new 
GenerateSequenceProcedure());
+  }
+
+  public CatalogWithBuiltInProcedure(String name) {
+    super(name);
+  }
+  
+  @Override
+  public List<String> listProcedures(String dbName) throws 
DatabaseNotExistException, CatalogException {
+    return PROCEDURE_MAP.keySet().stream().filter(procedurePath -> 
procedurePath.getDatabaseName().equals(dbName))
+            .map(ObjectPath::getObjectName).collect(Collectors.toList());
+  }
+
+  @Override
+  public Procedure getProcedure(ObjectPath procedurePath) throws 
ProcedureNotExistException, CatalogException {
+    if (PROCEDURE_MAP.containsKey(procedurePath)) {
+      return PROCEDURE_MAP.get(procedurePath);
+    } else {
+      throw new ProcedureNotExistException(getName(), procedurePath);
+    }
+  }
+}
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
+import org.apache.flink.table.procedures.Procedure;
+
+// 有内置 procedure 的 Catalog
+class CatalogWithBuiltInProcedure(name: String) extends 
GenericInMemoryCatalog(name) {
+
+  val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure](
+    ObjectPath.fromString("system.generate_n"), new 
GenerateSequenceProcedure());
+
+  @throws(classOf[DatabaseNotExistException])
+  @throws(classOf[CatalogException])
+  override def listProcedures(dbName: String): List[String] = {
+    if (!databaseExists(dbName)) {
+      throw new DatabaseNotExistException(getName, dbName);
+    }
+    PROCEDURE_MAP.keySet.filter(procedurePath => 
procedurePath.getDatabaseName.equals(dbName))
+      .map(procedurePath => procedurePath.getObjectName).toList
+  }
+
+  @throws(classOf[ProcedureNotExistException])
+  override def getProcedure(procedurePath: ObjectPath): Procedure = {
+    if (PROCEDURE_MAP.contains(procedurePath)) {
+      PROCEDURE_MAP(procedurePath);
+    } else {
+      throw new ProcedureNotExistException(getName, procedurePath)
+    }
+  }
+}
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+
+## 例子
+
+下面的例子展示了如何在一个 Catalog 中提供一个存储过程并且通过 `CALL` 
语句来调用这个存储过程。详情可参考[开发指南](#implementation-guide)。
+
+{{< tabs "c3edd888-30db-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+
+// 首先实现一个存储过程
+public static class GenerateSequenceProcedure implements Procedure {
+
+  public long[] call(ProcedureContext context, int n) {
+    long[] sequenceN = new long[n];
+    int i = 0;
+    try (CloseableIterator<Long> result = env.fromSequence(0, n - 
1).executeAndCollect()) {
+      while (result.hasNext()) {
+        sequenceN[i++] = result.next();
+      }
+    }
+    return sequenceN;
+  }
+}
+
+// 自定义一个 Catalog,并返回该存储过程
+public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog 
{
+
+  static {
+    PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new 
GenerateSequenceProcedure());
+  }
+  // 省略一些方法
+  // ...  
+  @Override
+  public Procedure getProcedure(ObjectPath procedurePath) throws 
ProcedureNotExistException, CatalogException {
+    if (PROCEDURE_MAP.containsKey(procedurePath)) {
+      return PROCEDURE_MAP.get(procedurePath);
+    } else {
+        throw new ProcedureNotExistException(getName(), procedurePath);
+    }
+  }
+}
+
+TableEnvironment tEnv = TableEnvironment.create(...);
+// 注册这个 Catalog
+tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure());
+// 通过 Call 语句调用该存储过程
+tEnv.executeSql("call my_catalog.`system`.generate_n(5)");
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+
+import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
+import org.apache.flink.table.catalog.exceptions.{CatalogException, 
ProcedureNotExistException}
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure
+
+// 首先实现一个存储过程
+class GenerateSequenceProcedure extends Procedure {
+
+  def call(context: ProcedureContext, n: Integer): Array[Long] = {
+    val env = context.getExecutionEnvironment
+    val sequenceN = Array[Long]
+    var i = 0;
+    env.fromSequence(0, n - 1).executeAndCollect()
+      .forEachRemaining(r => {
+        sequenceN(i) = r
+        i = i + 1
+      })
+    sequenceN;
+  }
+}
+
+// 然后在一个自定义的 catalog 返回该 procedure
+class CatalogWithBuiltInProcedure(name: String) extends 
GenericInMemoryCatalog(name) {
+
+  val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, 
Procedure](ObjectPath.fromString("system.generate_n"),
+    new GenerateSequenceProcedure());
+
+  // 省略一些方法
+  // ...  
+
+  @throws(classOf[ProcedureNotExistException])
+  override def getProcedure(procedurePath: ObjectPath): Procedure = {
+    if (PROCEDURE_MAP.contains(procedurePath)) {
+      PROCEDURE_MAP(procedurePath);
+    } else {
+      throw new ProcedureNotExistException(getName, procedurePath)
+    }
+  }
+}
+
+TableEnvironment tEnv = TableEnvironment.create(...)
+// 注册该 catalog
+tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure())
+// 通过 Call 语句调用该存储过程
+tEnv.executeSql("call my_catalog.`system`.generate_n(5)")
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+
diff --git a/docs/content/docs/dev/table/procedures.md 
b/docs/content/docs/dev/table/procedures.md
new file mode 100644
index 00000000000..0122a0de191
--- /dev/null
+++ b/docs/content/docs/dev/table/procedures.md
@@ -0,0 +1,544 @@
+---
+title: "Procedures"
+is_beta: true
+weight: 50
+type: docs
+aliases:
+  - /dev/table/procedures.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Procedures
+
+Flink Table API & SQL empowers users to perform data manipulation and 
administrative tasks with procedures. Procedures can run FLINK jobs with the 
provided `StreamExecutionEnvironment`, making them more powerful and flexible.
+
+## Implementation Guide
+
+To call a procedure, it must be available in a catalog. To provide procedures 
in a catalog, you need to implement the procedure and then return it using the 
`Catalog.getProcedure(ObjectPath procedurePath)` method.
+The following steps will guild you on how to implement and provide a procedure 
in a catalog.
+
+### Procedure Class
+
+An implementation class must implement the interface 
`org.apache.flink.table.procedures.Procedure`.
+
+The class must be declared `public`, not `abstract`, and should be globally 
accessible. Thus, non-static inner or anonymous classes are not allowed.
+
+### Call Methods
+
+The interface doesn't provide any method,you have to define a method named 
`call` in which you can implement the logic of the procedure.
+The methods must be declared `public` and take a well-defined set of 
arguments. 
+
+Please note:
+
+* The first parameter of the method `call` should always be `ProcedureContext` 
which provides the method `getExecutionEnvironment` to get a 
`StreamExecutionEnvironment` for running a Flink Job
+* The return type should always be an array, like `int[]`, `String[]`, etc
+
+More detail can be found in the Java doc of the class 
`org.apache.flink.table.procedures.Procedure`.
+
+Regular JVM method calling semantics apply. Therefore, it is possible to:
+- implement overloaded methods such as `call(ProcedureContext, Integer)` and 
`call(ProcedureContext, LocalDateTime)`
+- use var-args such as `call(ProcedureContext, Integer...)`
+- use object inheritance such as `call(ProcedureContext, Object)` that takes 
both `LocalDateTime` and `Integer`
+- and combinations of the above such as `call(ProcedureContext, Object...)` 
that takes all kinds of arguments
+
+If you intend to implement procedures in Scala, please add the 
`scala.annotation.varargs` annotation in
+case of variable arguments. Furthermore, it is recommended to use boxed 
primitives (e.g. `java.lang.Integer`
+instead of `Int`) to support `NULL`.
+
+The following snippets shows an example of an overloaded procedure:
+
+{{< tabs "0819d780-3052-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+
+// procedure with overloaded call methods
+public class GenerateSequenceProcedure implements Procedure {
+ 
+  public long[] call(ProcedureContext context, int n) {
+    return generate(context.getExecutionEnvironment(), n);
+  }
+
+  public long[] call(ProcedureContext context, String n) {
+    return generate(context.getExecutionEnvironment(), Integer.parseInt(n));
+  }
+
+  private long[] generate(StreamExecutionEnvironment env, int n) throws 
Exception {
+    long[] sequenceN = new long[n];
+    int i = 0;
+    try (CloseableIterator<Long> result = env.fromSequence(0, n - 
1).executeAndCollect()) {
+      while (result.hasNext()) {
+        sequenceN[i++] = result.next();
+      }
+    }
+    return sequenceN;
+  }
+}
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure
+import scala.annotation.varargs
+
+// procedures with overloaded call methods
+class GenerateSequenceProcedure extends Procedure {
+
+  def call(context: ProcedureContext, a: Integer, b: Integer): Array[Integer] 
= {
+    Array(a + b)
+  }
+
+  def call(context: ProcedureContext, a: String, b: String): Array[Integer] = {
+    Array(Integer.valueOf(a) + Integer.valueOf(b))
+  }
+
+  @varargs // generate var-args like Java
+  def call(context: ProcedureContext, d: Double*): Array[Integer] = {
+    Array(d.sum.toInt)
+  }
+}
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Type Inference
+The table ecosystem (similar to the SQL standard) is a strongly typed API. 
Therefore, both procedure parameters and return types must be mapped to a [data 
type]({{< ref "docs/dev/table/types" >}}).
+
+From a logical perspective, the planner needs information about expected 
types, precision, and scale. From a JVM perspective, the planner needs 
information about how internal data structures are represented as JVM objects 
when calling a procedure.
+
+The logic for validating input arguments and deriving data types for both the 
parameters and the result of a procedure is summarized under the term _type 
inference_.
+
+Flink's procedures implement an automatic type inference extraction that 
derives data types from the procedure's class and its `call` methods via 
reflection. If this implicit reflective extraction approach is not successful, 
the extraction process can be supported by annotating affected parameters, 
classes, or methods with `@DataTypeHint` and `@ProcedureHint`. More examples on 
how to annotate procedures are shown below.
+
+Note: although the return type in `call` method must be array type `T[]`, if 
use `@DataTypeHint` to annotate the return type, it's actually expected to 
annotate the component type of the array type, which is actually `T`.
+
+#### Automatic Type Inference
+
+The automatic type inference inspects the procedure's class and `call` methods 
to derive data types for the arguments and result of a procedure. 
`@DataTypeHint` and `@ProcedureHint` annotations support the automatic 
extraction.
+
+For a full list of classes that can be implicitly mapped to a data type, 
please refer to the [data type extraction section]({{< ref 
"docs/dev/table/types" >}}#data-type-extraction).
+
+**`@DataTypeHint`**
+
+In many scenarios, it is required to support the automatic extraction _inline_ 
for parameters and return types of a procedure
+
+The following example shows how to use data type hints. More information can 
be found in the documentation of the annotation class.
+
+{{< tabs "9414057c-3051-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.types.Row;
+
+// procedure with overloaded call methods
+public static class OverloadedProcedure implements Procedure {
+
+  // no hint required
+  public Long[] call(ProcedureContext context, long a, long b) {
+    return new Long[] {a + b};
+  }
+
+  // define the precision and scale of a decimal
+  public @DataTypeHint("DECIMAL(12, 3)") BigDecimal[] call(ProcedureContext 
context, double a, double b) {
+    return new BigDecimal[] {BigDecimal.valueOf(a + b)};
+  }
+
+  // define a nested data type
+  @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
+  public Row[] call(ProcedureContext context, int i) {
+    return new Row[] {Row.of(String.valueOf(i), Instant.ofEpochSecond(i))};
+  }
+
+  // allow wildcard input and custom serialized output
+  @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
+  public ByteBuffer[] call(ProcedureContext context, @DataTypeHint(inputGroup 
= InputGroup.ANY) Object o) {
+    return new ByteBuffer[] {MyUtils.serializeToByteBuffer(o)};
+  }
+}
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure
+import org.apache.flink.types.Row
+
+// procedure with overloaded call methods
+class OverloadedProcedure extends Procedure {
+
+  // no hint required
+  def call(context: ProcedureContext, a: Long, b: Long): Array[Long] = {
+    Array(a + b)
+  }
+
+  // define the precision and scale of a decimal
+  @DataTypeHint("DECIMAL(12, 3)")
+  def call(context: ProcedureContext, a: Double, b: Double): Array[BigDecimal] 
= {
+    Array(BigDecimal.valueOf(a + b))
+  }
+
+  // define a nested data type
+  @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
+  def call(context: ProcedureContext, i: Integer): Array[Row] = {
+    Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
+  }
+
+  // allow wildcard input and custom serialized output
+  @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
+  def call(context: ProcedureContext, @DataTypeHint(inputGroup = 
InputGroup.ANY) o: Object): Array[java.nio.ByteBuffer] = {
+    Array[MyUtils.serializeToByteBuffer(o)]
+  }
+}
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**`@ProcedureHint`**
+
+In some scenarios, it is desirable that one `call` method handles multiple 
different data types at the same time. Furthermore, in some scenarios, 
overloaded `call` methods have a common result type that should be declared 
only once.
+
+The `@ProcedureHint` annotation can provide a mapping from argument data types 
to a result data type. It enables annotating entire procedure classes or `call` 
methods for input and result data types. One or more annotations can be 
declared on top of a class or individually for each `call` method for 
overloading procedure signatures. All hint parameters are optional. If a 
parameter is not defined, the default reflection-based extraction is used. Hint 
parameters defined on top of a procedu [...]
+
+The following example shows how to use procedure hints. More information can 
be found in the documentation of the annotation class.
+
+{{< tabs "16d50628-3052-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.types.Row;
+
+// procedure with overloaded call methods
+// but globally defined output type
+@ProcedureHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+public static class OverloadedProcedure implements Procedure {
+
+  public Row[] call(ProcedureContext context, int a, int b) {
+    return new Row[] {Row.of("Sum", a + b)};
+  }
+
+  // overloading of arguments is still possible
+  public Row[] call(ProcedureContext context) {
+    return new Row[] {Row.of("Empty args", -1)};
+  }
+}
+
+// decouples the type inference from call methods,
+// the type inference is entirely determined by the procedure hints
+@ProcedureHint(
+  input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
+  output = @DataTypeHint("INT")
+)
+@ProcedureHint(
+  input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
+  output = @DataTypeHint("BIGINT")
+)
+@ProcedureHint(
+  input = {},
+  output = @DataTypeHint("BOOLEAN")
+)
+public static class OverloadedProcedure implements Procedure {
+
+  // an implementer just needs to make sure that a method exists
+  // that can be called by the JVM
+  public Object[] call(ProcedureContext context, Object... o) {
+    if (o.length == 0) {
+      return new Object[] {false};
+    }
+    return new Object[] {o[0]};
+  }
+}
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.ProcedureHint
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure
+import org.apache.flink.types.Row
+import scala.annotation.varargs
+
+// procedure with overloaded call methods
+// but globally defined output type
+@ProcedureHint(output = new DataTypeHint("ROW<s STRING, i INT>"))
+class OverloadedFunction extends Procedure {
+
+  def call(context: ProcedureContext, a: Int, b: Int): Array[Row] = {
+    Array(Row.of("Sum", Int.box(a + b)))
+  }
+
+  // overloading of arguments is still possible
+  def call(context: ProcedureContext): Array[Row] = {
+    Array(Row.of("Empty args", Int.box(-1)))
+  }
+}
+
+// decouples the type inference from call methods,
+// the type inference is entirely determined by the function hints
+@ProcedureHint(
+  input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")),
+  output = new DataTypeHint("INT")
+)
+@ProcedureHint(
+  input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")),
+  output = new DataTypeHint("BIGINT")
+)
+@ProcedureHint(
+  input = Array(),
+  output = new DataTypeHint("BOOLEAN")
+)
+class OverloadedProcedure extends Procedure {
+
+  // an implementer just needs to make sure that a method exists
+  // that can be called by the JVM
+  @varargs
+  def call(context: ProcedureContext, o: AnyRef*): Array[AnyRef]= {
+    if (o.length == 0) {
+      Array(Boolean.box(false))
+    }
+    Array(o(0))
+  }
+}
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+
+### Return Procedure in Catalog
+After implementing a procedure, the catalog can then return the procedure in 
method `Catalog.getProcedure(ObjectPath procedurePath)`. The following example 
shows how to return it in a catalog.
+Also, it's expected to list all the procedures in method 
`Catalog.listProcedures(String dbName)`.
+
+{{< tabs "2e7d4b78-3052-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+
+import java.util.HashMap;
+import java.util.Map;
+
+// catalog with built-in procedures
+public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog 
{
+  static {
+    PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new 
GenerateSequenceProcedure());
+  }
+
+  public CatalogWithBuiltInProcedure(String name) {
+    super(name);
+  }
+  
+  @Override
+  public List<String> listProcedures(String dbName) throws 
DatabaseNotExistException, CatalogException {
+    if (!databaseExists(dbName)) {
+      throw new DatabaseNotExistException(getName(), dbName);
+    }
+    return PROCEDURE_MAP.keySet().stream().filter(procedurePath -> 
procedurePath.getDatabaseName().equals(dbName))
+            .map(ObjectPath::getObjectName).collect(Collectors.toList());
+  }
+
+  @Override
+  public Procedure getProcedure(ObjectPath procedurePath) throws 
ProcedureNotExistException, CatalogException {
+    if (PROCEDURE_MAP.containsKey(procedurePath)) {
+      return PROCEDURE_MAP.get(procedurePath);
+    } else {
+      throw new ProcedureNotExistException(getName(), procedurePath);
+    }
+  }
+}
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
+import org.apache.flink.table.procedures.Procedure;
+
+// catalog with built-in procedures
+class CatalogWithBuiltInProcedure(name: String) extends 
GenericInMemoryCatalog(name) {
+
+  val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, Procedure](
+    ObjectPath.fromString("system.generate_n"), new 
GenerateSequenceProcedure());
+
+  @throws(classOf[DatabaseNotExistException])
+  @throws(classOf[CatalogException])
+  override def listProcedures(dbName: String): List[String] = {
+    if (!databaseExists(dbName)) {
+      throw new DatabaseNotExistException(getName, dbName);
+    }
+    PROCEDURE_MAP.keySet.filter(procedurePath => 
procedurePath.getDatabaseName.equals(dbName))
+      .map(procedurePath => procedurePath.getObjectName).toList
+  }
+
+  @throws(classOf[ProcedureNotExistException])
+  override def getProcedure(procedurePath: ObjectPath): Procedure = {
+    if (PROCEDURE_MAP.contains(procedurePath)) {
+      PROCEDURE_MAP(procedurePath);
+    } else {
+      throw new ProcedureNotExistException(getName, procedurePath)
+    }
+  }
+}
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+
+## Examples
+
+The following example shows how to provide a procedure in a `Catalog` and call 
it with `CALL` statement. See the [Implementation Guide](#implementation-guide) 
for more details.
+
+{{< tabs "1ed81b8a-3052-11ee-be56-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.table.procedures.Procedure;
+
+// first implement a procedure
+public class GenerateSequenceProcedure implements Procedure {
+
+  public long[] call(ProcedureContext context, int n) {
+    long[] sequenceN = new long[n];
+    int i = 0;
+    try (CloseableIterator<Long> result = env.fromSequence(0, n - 
1).executeAndCollect()) {
+      while (result.hasNext()) {
+        sequenceN[i++] = result.next();
+      }
+    }
+    return sequenceN;
+  }
+}
+
+// then provide the procedure in a custom catalog
+public static class CatalogWithBuiltInProcedure extends GenericInMemoryCatalog 
{ 
+
+  static {
+    PROCEDURE_MAP.put(ObjectPath.fromString("system.generate_n"), new 
GenerateSequenceProcedure());
+  }
+  // emit some methods
+  // ...  
+  @Override
+  public Procedure getProcedure(ObjectPath procedurePath) throws 
ProcedureNotExistException, CatalogException {
+    if (PROCEDURE_MAP.containsKey(procedurePath)) {
+      return PROCEDURE_MAP.get(procedurePath);
+    } else {
+        throw new ProcedureNotExistException(getName(), procedurePath);
+    }
+  }
+}
+
+TableEnvironment tEnv = TableEnvironment.create(...);
+// register the catalog
+tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure());
+// call the procedure with CALL statement
+tEnv.executeSql("call my_catalog.`system`.generate_n(5)");
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+
+import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
+import org.apache.flink.table.catalog.exceptions.{CatalogException, 
ProcedureNotExistException}
+import org.apache.flink.table.procedure.ProcedureContext
+import org.apache.flink.table.procedures.Procedure
+
+// first implement a procedure
+class GenerateSequenceProcedure extends Procedure {
+
+  def call(context: ProcedureContext, n: Integer): Array[Long] = {
+    val env = context.getExecutionEnvironment
+    val sequenceN = Array[Long]
+    var i = 0;
+    env.fromSequence(0, n - 1).executeAndCollect()
+      .forEachRemaining(r => {
+        sequenceN(i) = r
+        i = i + 1
+      })
+    sequenceN;
+  }
+}
+
+// then provide the procedure in a custom catalog
+class CatalogWithBuiltInProcedure(name: String) extends 
GenericInMemoryCatalog(name) {
+
+  val PROCEDURE_MAP = collection.immutable.HashMap[ObjectPath, 
Procedure](ObjectPath.fromString("system.generate_n"),
+    new GenerateSequenceProcedure());
+
+  // emit some methods
+  // ...  
+
+  @throws(classOf[ProcedureNotExistException])
+  override def getProcedure(procedurePath: ObjectPath): Procedure = {
+    if (PROCEDURE_MAP.contains(procedurePath)) {
+      PROCEDURE_MAP(procedurePath);
+    } else {
+      throw new ProcedureNotExistException(getName, procedurePath)
+    }
+  }
+}
+
+TableEnvironment tEnv = TableEnvironment.create(...)
+// register the catalog
+tEnv.registerCatalog("my_catalog", new CatalogWithBuiltInProcedure())
+// call the procedure with CALL statement
+tEnv.executeSql("call my_catalog.`system`.generate_n(5)")
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+


Reply via email to