AlanConfluent commented on code in PR #26610:
URL: https://github.com/apache/flink/pull/26610#discussion_r2237109705


##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,109 @@ If you intend to implement or call functions in Python, 
please refer to the [Pyt
 
 {{< top >}}
 
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream 
events with data stored in a database), one needs to take care that network or 
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a 
`ScalarFunction`, typically means **synchronous** interaction: A request is 
sent to the database and the `ScalarFunction` waits until the response has been 
received. In many cases, this waiting makes up the vast majority of the 
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous 
interaction with the database means that a single function instance can handle 
many requests concurrently and receive the responses concurrently. That way, 
the waiting time can be overlaid with sending other requests and receiving 
responses. At the very least, the waiting time is amortized over multiple 
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar 
values to a new scalar value. Any data type listed in the [data types 
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or 
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class 
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one 
or more evaluation methods named `eval(...)`.  The first argument must be a 
`CompletableFuture<...>` which is used to return the result, with subsequent 
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by 
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref 
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+The following example shows how to do work on a thread pool in the background, 
though any libraries exposing an async interface may be directly used to 
complete the `CompletableFuture` from a callback. See the [Implementation 
Guide](#implementation-guide) for more details.
+
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+/**
+ * A function which simulates looking up a beverage name from a database.
+ * Since such lookups are often slow, we use an AsyncScalarFunction.
+ */
+public static class BeverageNameLookupFunction extends AsyncScalarFunction {
+    private Executor executor;
+
+    @Override
+    public void open(FunctionContext context) {
+        // Create a thread pool for executing the background lookup.
+        executor = Executors.newFixedThreadPool(10);
+    }
+
+    // The eval method takes a future for the result and the beverage ID to 
lookup.
+    public void eval(CompletableFuture<String> future, Integer beverageId) {
+        // Submit a task to the thread pool. We don't want to block this main 
+        // thread since would prevent concurrent execution. The future can be 

Review Comment:
   Done



##########
docs/content/docs/dev/table/functions/udfs.md:
##########
@@ -1017,6 +1018,109 @@ If you intend to implement or call functions in Python, 
please refer to the [Pyt
 
 {{< top >}}
 
+Asynchronous Scalar Functions
+----------------
+
+When interacting with external systems (for example when enriching stream 
events with data stored in a database), one needs to take care that network or 
other latency does not dominate the streaming application’s running time.
+
+Naively accessing data in the external database, for example using a 
`ScalarFunction`, typically means **synchronous** interaction: A request is 
sent to the database and the `ScalarFunction` waits until the response has been 
received. In many cases, this waiting makes up the vast majority of the 
function’s time.
+
+To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous 
interaction with the database means that a single function instance can handle 
many requests concurrently and receive the responses concurrently. That way, 
the waiting time can be overlaid with sending other requests and receiving 
responses. At the very least, the waiting time is amortized over multiple 
requests. This leads in most cases to much higher streaming throughput.
+
+{{< img src="/fig/async_io.svg" width="50%" >}}
+
+#### Defining an AsyncScalarFunction
+
+A user-defined asynchronous scalar function maps zero, one, or multiple scalar 
values to a new scalar value. Any data type listed in the [data types 
section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or 
return type of an evaluation method.
+
+In order to define an asynchronous scalar function, extend the base class 
`AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one 
or more evaluation methods named `eval(...)`.  The first argument must be a 
`CompletableFuture<...>` which is used to return the result, with subsequent 
arguments being the parameters passed to the function.
+
+The number of outstanding calls to `eval` may be configured by 
[`table.exec.async-scalar.max-concurrent-operations`]({{< ref 
"docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
+
+The following example shows how to do work on a thread pool in the background, 
though any libraries exposing an async interface may be directly used to 
complete the `CompletableFuture` from a callback. See the [Implementation 
Guide](#implementation-guide) for more details.
+
+```java
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.table.api.Expressions.*;
+
+/**
+ * A function which simulates looking up a beverage name from a database.
+ * Since such lookups are often slow, we use an AsyncScalarFunction.
+ */
+public static class BeverageNameLookupFunction extends AsyncScalarFunction {
+    private Executor executor;

Review Comment:
   Done.  Yeah, I tested this out and it didn't have an error.  Is it because 
it's private?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to