twalthr commented on code in PR #26610:
URL: https://github.com/apache/flink/pull/26610#discussion_r2236832505
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,109 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+The following example shows how to do work on a thread pool in the background,
though any libraries exposing an async interface may be directly used to
complete the `CompletableFuture` from a callback. See the [Implementation
Guide](#implementation-guide) for more details.
+
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+/**
+ * A function which simulates looking up a beverage name from a database.
+ * Since such lookups are often slow, we use an AsyncScalarFunction.
+ */
+public static class BeverageNameLookupFunction extends AsyncScalarFunction {
+ private Executor executor;
+
+ @Override
+ public void open(FunctionContext context) {
+ // Create a thread pool for executing the background lookup.
+ executor = Executors.newFixedThreadPool(10);
+ }
+
+ // The eval method takes a future for the result and the beverage ID to
lookup.
+ public void eval(CompletableFuture<String> future, Integer beverageId) {
+ // Submit a task to the thread pool. We don't want to block this main
+ // thread since would prevent concurrent execution. The future can be
Review Comment:
```suggestion
// thread since that would prevent concurrent execution. The future
can be
```
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,109 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+The following example shows how to do work on a thread pool in the background,
though any libraries exposing an async interface may be directly used to
complete the `CompletableFuture` from a callback. See the [Implementation
Guide](#implementation-guide) for more details.
+
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+/**
+ * A function which simulates looking up a beverage name from a database.
+ * Since such lookups are often slow, we use an AsyncScalarFunction.
+ */
+public static class BeverageNameLookupFunction extends AsyncScalarFunction {
+ private Executor executor;
Review Comment:
Rather a bug that our API is not able to catch this.
##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,109 @@ If you intend to implement or call functions in Python,
please refer to the [Pyt
{{< top >}}
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream
events with data stored in a database), one needs to take care that network or
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a
`ScalarFunction`, typically means **synchronous** interaction: A request is
sent to the database and the `ScalarFunction` waits until the response has been
received. In many cases, this waiting makes up the vast majority of the
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous
interaction with the database means that a single function instance can handle
many requests concurrently and receive the responses concurrently. That way,
the waiting time can be overlaid with sending other requests and receiving
responses. At the very least, the waiting time is amortized over multiple
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar
values to a new scalar value. Any data type listed in the [data types
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one
or more evaluation methods named `eval(...)`. The first argument must be a
`CompletableFuture<...>` which is used to return the result, with subsequent
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+The following example shows how to do work on a thread pool in the background,
though any libraries exposing an async interface may be directly used to
complete the `CompletableFuture` from a callback. See the [Implementation
Guide](#implementation-guide) for more details.
+
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+/**
+ * A function which simulates looking up a beverage name from a database.
+ * Since such lookups are often slow, we use an AsyncScalarFunction.
+ */
+public static class BeverageNameLookupFunction extends AsyncScalarFunction {
+ private Executor executor;
Review Comment:
```suggestion
private transient Executor executor;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]