Hi, Alan. Thanks for driving this.

Using async to improve throughput has been done on look join, and the 
improvement 
effect is obvious, so I think it makes sense to support async scalar function.  
Big +1 for this flip. 
I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query hint(individual async 
udx) to mark the output 
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but chose 
to introduce a new enum 
in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


What about throwing an exception to make it clear to users that using async 
scalar functions in this situation 
is problematic instead of executing silently in sync mode? Because users may be 
confused about 
the final actual job graph.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction









--

    Best!
    Xuyang





在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
>Hi Alan,
>    Nice FLIP, I also explore leveraging the async table function[1] to
>improve the throughput before.
>
>About the configs, what do you think using hints as mentioned in [1].
>
>[1]:
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
>
>Thanks,
>Aitozi.
>
>Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:
>
>> Hi Alan,
>>
>> thanks for proposing this FLIP. It's a great addition to Flink and has
>> been requested multiple times. It will be in particular interesting for
>> accessing REST endpoints and other remote services.
>>
>> Great that we can generalize and reuse parts of the Python planner rules
>> and code for this.
>>
>> I have some feedback regarding the API:
>>
>> 1) Configuration
>>
>> Configuration keys like
>>
>> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
>>
>> are currently not supported in the configuration stack. The key space
>> should remain constant. Only a constant key space enables the use of the
>> ConfigOption class which is required in the layered configuration. For
>> now I would suggest to only allow a global setting for buffer capacity,
>> timeout, and retry-strategy. We can later work on a per-function
>> configuration (potentially also needed for other use cases).
>>
>> 2) Semantical declaration
>>
>> Regarding
>>
>> `table.exec.async-scalar.catalog.db.func-name.output-mode`
>>
>> this is a semantical property of a function and should be defined
>> per-function. It impacts the query result and potentially the behavior
>> of planner rules.
>>
>> I see two options for this either: (a) an additional method in
>> AsyncScalarFunction or (b) adding this to the function's requirements. I
>> vote for (b), because a FunctionDefinition should be fully self
>> contained and sufficient for planning.
>>
>> Thus, for `FunctionDefinition.getRequirements():
>> Set<FunctionRequirement>` we can add a new requirement `ORDERED` which
>> should also be the default for AsyncScalarFunction. `getRequirements()`
>> can be overwritten and return a set without this requirement if the user
>> intents to do this.
>>
>>
>> Thanks,
>> Timo
>>
>>
>>
>>
>> On 11.12.23 18:43, Piotr Nowojski wrote:
>> > +1 to the idea, I don't have any comments.
>> >
>> > Best,
>> > Piotrek
>> >
>> > czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io
>> .invalid>
>> > napisał(a):
>> >
>> >>>
>> >>> Nicely written and makes sense.  The only feedback I have is around the
>> >>> naming of the generalization, e.g. "Specifically,
>> PythonCalcSplitRuleBase
>> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
>> to
>> >>> imply/suggest that all Async functions are remote.  I wonder if we can
>> >> find
>> >>> another name which doesn't carry that connotation; maybe
>> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
>> Python
>> >>> and Async functions seems reasonable.)
>> >>>
>> >> Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
>> >> believe that the python calls are also done asynchronously, so that
>> might
>> >> be a reasonable name, so long as there's no confusion between the base
>> and
>> >> async child class.
>> >>
>> >> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes <jhug...@confluent.io.invalid
>> >
>> >> wrote:
>> >>
>> >>> Hi Alan,
>> >>>
>> >>> Nicely written and makes sense.  The only feedback I have is around the
>> >>> naming of the generalization, e.g. "Specifically,
>> PythonCalcSplitRuleBase
>> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
>> to
>> >>> imply/suggest that all Async functions are remote.  I wonder if we can
>> >> find
>> >>> another name which doesn't carry that connotation; maybe
>> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
>> Python
>> >>> and Async functions seems reasonable.)
>> >>>
>> >>> Cheers,
>> >>>
>> >>> Jim
>> >>>
>> >>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
>> >>> <asheinb...@confluent.io.invalid> wrote:
>> >>>
>> >>>> I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
>> >>>> asynchronous scalar function support [1]
>> >>>>
>> >>>> This feature proposes adding a new UDF type AsyncScalarFunction which
>> >> is
>> >>>> invoked just like a normal ScalarFunction, but is implemented with an
>> >>>> asynchronous eval method.  I had brought this up including the
>> >> motivation
>> >>>> in a previous discussion thread [2].
>> >>>>
>> >>>> The purpose is to achieve high throughput scalar function UDFs while
>> >>>> allowing that an individual call may have high latency.  It allows
>> >>> scaling
>> >>>> up the parallelism of just these calls without having to increase the
>> >>>> parallelism of the whole query (which could be rather resource
>> >>>> inefficient).
>> >>>>
>> >>>> In practice, it should enable SQL integration with external services
>> >> and
>> >>>> systems, which Flink has limited support for at the moment. It should
>> >>> also
>> >>>> allow easier integration with existing libraries which use
>> asynchronous
>> >>>> APIs.
>> >>>>
>> >>>> Looking forward to your feedback and suggestions.
>> >>>>
>> >>>> [1]
>> >>>>
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
>> >>>> <
>> >>>>
>> >>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
>> >>>>>
>> >>>>
>> >>>> [2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
>> >>>> <https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
>> >>>>
>> >>>> Thanks,
>> >>>> Alan
>> >>>>
>> >>>
>> >>
>> >
>>
>>

Reply via email to