Hi Ron,

Thanks for your response.  I've answered some of your questions below.

I think one solution is to support Mini-Batch Lookup Join by the framework
> layer, do a RPC call by a batch input row, which can improve throughput.


Would the idea be to collect a batch and then do a single RPC (or at least
handle a number of rpcs in a single AsyncLookupFunction call)?  That is an
interesting idea and could simplify things.  For our use cases,
technically, I can write a AsyncLookupFunction and utilize
AsyncWaitOperator using unbatched RPCs and do a Lookup Join without any
issue. My hesitation is that I'm afraid that callers will find it
unintuitive to join with a table where the underlying RPC is not being
modeled in that manner.  For example, it could be a text classifier
IS_POSITIVE_SENTIMENT(...) where there's no backing table, just computation.

how does the remote function help to solve your problem?


The problem is pretty open-ended.  There are jobs where you want to join
data with some external data source and inject it into your pipeline, but
others might also be offloading some computation to an external system.
The external system might be owned by a different party, have different
permissions, have different hardware to do a computation (e.g. train a
model), or just block for a while.  The most intuitive invocation for this
is just a scalar function in SQL.  You just want it to be able to run at a
high throughput.

Regarding implementing the Remote Function, can you go into more detail
> about your idea, how we should support it, and how users should use it,
> from API design to semantic explanation?and


The unimplemented option I gave the most thought to is 3).  You can imagine
an AsyncScalarFunction definition and example class like:

public class AsyncScalarFunction<T> extends UserDefinedFunction {
  @Override public final FunctionKind getKind() {
    return FunctionKind.ASYNC_SCALAR;
  }
  @Override public TypeInference getTypeInference(DataTypeFactory
typeFactory) {
   return TypeInferenceExtractor.forAsyncScalarFunction(typeFactory,
getClass());
  }
}

class MyScalarFunction extends AsyncScalarFunction<String> {
  // Eval method with a future to use as a callback, with arbitrary
additional arguments
  public void eval(CompletableFuture<String> result, String input) {
    // Example which uses an async http client
    AsyncHttpClient httpClient = new AsyncHttpClient();
    // Do the request and then invoke the callback depending on the outcome.
    Future<HttpResponse> responseFuture = httpClient.doPOST(getRequestBody(
input));
    responseFuture.handle((response, throwable) -> {
    if (throwable != null) {
      result.completeExceptionally(throwable);
    } else {
      result.complete(response.getBody());
    }
   });
 }
 ...
}

Then you can register it in your Flink program as with other UDFs and call
it:
tEnv.createTemporarySystemFunction("MY_FUNCTION", MyScalarFunction.class);
TableResult result = tEnv.executeSql("SELECT MY_FUNCTION(input) FROM
(SELECT i.input from Inputs i ORDER BY i.timestamp)");

I know there are questions about SQL semantics to consider.  For example,
does invocation of MY_FUNCTION preserve the order of the subquery above.
To be SQL compliant, I believe it must, so any async request we send out
must be output in order, regardless of when they complete.  There are
probably other considerations as well.   This for example is implemented as
an option already in AsyncWaitOperator.

I could do a similar dive into option 2) if that would also be helpful,
though maybe this is a good starting point for conversation.

Hope that addressed your questions,
Alan

On Mon, Sep 18, 2023 at 6:51 PM liu ron <ron9....@gmail.com> wrote:

> Hi, Alan
>
> Thanks for driving this proposal. It sounds interesting.
> Regarding implementing the Remote Function, can you go into more detail
> about your idea, how we should support it, and how users should use it,
> from API design to semantic explanation?and how does the remote function
> help to solve your problem?
>
> I understand that your core pain point is that there are performance issues
> with too many RPC calls. For the three solutions you have explored.
> Regarding the Lookup Join Cons,
>
> >> *Lookup Joins:*
> Pros:
> - Part of the Flink codebase
> - High throughput
> Cons:
> - Unintuitive syntax
> - Harder to do multiple remote calls per input row
>
> I think one solution is to support Mini-Batch Lookup Join by the framework
> layer, do a RPC call by a batch input row, which can improve throughput.
>
> Best,
> Ron
>
> Alan Sheinberg <asheinb...@confluent.io.invalid> 于2023年9月19日周二 07:34写道:
>
> > Hello all,
> >
> > We want to implement a custom function that sends HTTP requests to a
> remote
> > endpoint using Flink SQL. Even though the function will behave like a
> > normal UDF, the runtime would issue calls asynchronously to achieve high
> > throughput for these remote (potentially high latency) calls. What is the
> > community's take on implementing greater support for such functions? Any
> > feedback is appreciated.
> >
> > What we have explored so far:
> >
> > 1.  Using a lookup join
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
> > >.
> > For example:
> > create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp string,
> > PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' =
> > 'remote_call');
> > SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r FOR
> > SYSTEM_TIME AS OF i.proc_time as a ON i.table_lookup_key = r.
> > table_lookup_key;
> >
> > 2.  Using a polymorphic table function. Partially supported already for
> > window
> > functions
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/
> > >.
> > For example:
> > SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as d, Col
> > => DESCRIPTOR("table_lookup_key")));
> >
> > 3.  Using an AsyncScalarFunction. Scalar functions are usually used as
> > below (thus support for an async version of ScalarFunction required):
> > SELECT REMOTE_CALL(t.table_lookup_key) FROM TableToLookup t;
> >
> > Some pros and cons for each approach:
> >
> > *Lookup Joins:*
> > Pros:
> > - Part of the Flink codebase
> > - High throughput
> > Cons:
> > - Unintuitive syntax
> > - Harder to do multiple remote calls per input row
> >
> > *PTFs:*
> > Pros:
> > - More intuitive syntax
> > Cons:
> > - Need to add more support in Flink. It may exist for specialized
> built-in
> > functions, but not for user defined ones
> >
> > *AsyncScalarFunction:*
> > Pros:
> > - Most intuitive syntax
> > - Easy to do as many calls per row input as desired
> > Cons:
> > - Need to add support in Flink, including a new interface with an async
> > eval method
> > - Out of order results could pose issues with SQL semantics. If we output
> > in order, the throughput performance may suffer
> >
> > Thanks,
> > Alan
> >
>

Reply via email to