Hi, Alan

Thanks for driving this proposal. It sounds interesting.
Regarding implementing the Remote Function, can you go into more detail
about your idea, how we should support it, and how users should use it,
from API design to semantic explanation?and how does the remote function
help to solve your problem?

I understand that your core pain point is that there are performance issues
with too many RPC calls. For the three solutions you have explored.
Regarding the Lookup Join Cons,

>> *Lookup Joins:*
Pros:
- Part of the Flink codebase
- High throughput
Cons:
- Unintuitive syntax
- Harder to do multiple remote calls per input row

I think one solution is to support Mini-Batch Lookup Join by the framework
layer, do a RPC call by a batch input row, which can improve throughput.

Best,
Ron

Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年9月19日周二 07:34写道:

> Hello all,
>
> We want to implement a custom function that sends HTTP requests to a remote
> endpoint using Flink SQL. Even though the function will behave like a
> normal UDF, the runtime would issue calls asynchronously to achieve high
> throughput for these remote (potentially high latency) calls. What is the
> community's take on implementing greater support for such functions? Any
> feedback is appreciated.
>
> What we have explored so far:
>
> 1.  Using a lookup join
> <
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> >.
> For example:
> create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp string,
> PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' =
> 'remote_call');
> SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r FOR
> SYSTEM_TIME AS OF i.proc_time as a ON i.table_lookup_key = r.
> table_lookup_key;
>
> 2.  Using a polymorphic table function. Partially supported already for
> window
> functions
> <
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/
> >.
> For example:
> SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as d, Col
> => DESCRIPTOR("table_lookup_key")));
>
> 3.  Using an AsyncScalarFunction. Scalar functions are usually used as
> below (thus support for an async version of ScalarFunction required):
> SELECT REMOTE_CALL(t.table_lookup_key) FROM TableToLookup t;
>
> Some pros and cons for each approach:
>
> *Lookup Joins:*
> Pros:
> - Part of the Flink codebase
> - High throughput
> Cons:
> - Unintuitive syntax
> - Harder to do multiple remote calls per input row
>
> *PTFs:*
> Pros:
> - More intuitive syntax
> Cons:
> - Need to add more support in Flink. It may exist for specialized built-in
> functions, but not for user defined ones
>
> *AsyncScalarFunction:*
> Pros:
> - Most intuitive syntax
> - Easy to do as many calls per row input as desired
> Cons:
> - Need to add support in Flink, including a new interface with an async
> eval method
> - Out of order results could pose issues with SQL semantics. If we output
> in order, the throughput performance may suffer
>
> Thanks,
> Alan
>

Reply via email to