Re:Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-17 Thread Xuyang
Hi, Alan and Timo. Thanks for your reply.
>Would it make a difference if it were exposed by the explain
>method (the operator having "syncMode" vs not)?
@Alan: I think this is a good way to tell the user what mode these async udx 
are currently in.
>A regular SQL user doesn't care whether the function is sync or async. 
@Timo: I agree that the planner should throw as few exceptions as possible 
rather than confusing users. So I think 
it is a good way to expose syncMode through explain syntax.
> If the input to the operator is append-only, it seems fine, 
> because this implies that each row is effectively independent and ordering is 
> unimportant.


> For example, if the query is > an append-only `SELECT FUNC(c) FROM t`, 
> I don't see a reason why the > operator is not allowed to produce unordered 
> results.


@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as 
source and mysql as sink as an example. 
Although kafka is an append-only source, one of its fields is used as pk when 
writing to mysql. If async udx is executed
 in an unordered mode, there may be problems with the data in mysql in the end. 
In this case, we need to ensure that 
the sink-based pk is in order actually.



--

Best!
Xuyang





At 2023-12-16 03:33:55, "Alan Sheinberg"  
wrote:
>Thanks for the replies everyone.  My responses are inline:
>
>About the configs, what do you think using hints as mentioned in [1].
>
>@Aitozi: I think hints could be a good way to do this, similar to lookup
>joins or the proposal in FLIP-313.  One benefit of hints is that they allow
>for the highest granularity of configuration because you can decide at
>each and every call site just what parameters to use.  The downside of
>hints is that there's more syntax to learn and more verbosity.  I'm
>somewhat partial to a configuration like this with a class definition level
>of granularity (similar to how metrics reporters are defined [1]):
>
>table.exec.async-scalar.myfunc.class: org.apache.flink.MyAsyncScalarFunction
>table.exec.async-scalar.myfunc.buffer-capacity: 10
>...
>
>As Timo mentioned, the downside to this is that there's not a nice static
>way to do this at the moment, unless you extend ConfigOption.  It would be
>good ultimately if Lookup joins, async scalar functions, and other future
>configurable UDFs shared the same methodology, but maybe a unified approach
>is a followup discussion.
>
>I’m just curious why you don’t use conf(global) and query hint(individual
>> async udx) to mark the output
>> mode 'order' or 'unorder' like async look join [1] and async udtf[2], but
>> chose to introduce a new enum
>> in AsyncScalarFunction.
>
>
>@Xuyang: I'm open to adding hints. I think the important part is that we
>have some method for the user to have a class definition level way to
>define whether ORDERED or ALLOW_UNORDERED is most appropriate.  I don't
>have a strong sense yet for what would be most appropriately exposed as a
>FunctionRequirement vs a simple configuration/hint.
>
>What about throwing an exception to make it clear to users that using async
>> scalar functions in this situation
>> is problematic instead of executing silently in sync mode? Because users
>> may be confused about
>> the final actual job graph.
>
>
>@Xuyang: Would it make a difference if it were exposed by the explain
>method (the operator having "syncMode" vs not)?  I'd be fine to do it
>either way -- certainly throwing an error is a bit simpler.
>
>You are right. Actually it should be the planner that fully decides
>> whether ORDERED or UNORDERED is safe to do. For example, if the query is
>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
>> operator is not allowed to produce unordered results. By global
>> configuration, we can set ORDERED such that users don't get confused
>> about the unordered output.
>
>
>@Timo: Is there an easy way to determine if the output of an async function
>would be problematic or not?  If the input to the operator is append-only,
>it seems fine, because this implies that each row is effectively
>independent and ordering is unimportant. For upsert mode with +U rows, you
>wouldn't want to swap order with other +U rows for the same key because the
>last one should win.  For -D or -U rows, you wouldn't want to swap with
>other rows for the same key for similar reasons.  Is it as simple as
>looking for the changlelog mode to determine whether it's safe to run async
>functions UNORDERED?  I had considered analyzing various query forms (join
>vs aggregation vs whatever), but it seems like changelog mode could be
>sufficient to understand what works and what would be an issue.  Any code
>pointers and explanation for similar analysis would be great to understand
>this more.
>
>The mode UNORDERED however should only have
>> effect for these simply use cases and throw an exception if UNORDERED
>> would mess up a changelog or other subsequent operators.
>
>@Timo: Should we throw errors or run in sync mod

Re:Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-14 Thread Xuyang
Hi, Alan. Thanks for driving this.


Using async to improve throughput has been done on look join, and the 
improvement 
effect is obvious, so I think it makes sense to support async scalar function.  
Big +1 for this flip. 
I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query hint(individual async 
udx) to mark the output 
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but chose 
to introduce a new enum 
in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


What about throwing an exception to make it clear to users that using async 
scalar functions in this situation 
is problematic instead of executing silently in sync mode? Because users may be 
confused about 
the final actual job graph.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction









--

Best!
Xuyang





在 2023-12-15 11:20:24,"Aitozi"  写道:
>Hi Alan,
>Nice FLIP, I also explore leveraging the async table function[1] to
>improve the throughput before.
>
>About the configs, what do you think using hints as mentioned in [1].
>
>[1]:
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
>
>Thanks,
>Aitozi.
>
>Timo Walther  于2023年12月14日周四 17:29写道:
>
>> Hi Alan,
>>
>> thanks for proposing this FLIP. It's a great addition to Flink and has
>> been requested multiple times. It will be in particular interesting for
>> accessing REST endpoints and other remote services.
>>
>> Great that we can generalize and reuse parts of the Python planner rules
>> and code for this.
>>
>> I have some feedback regarding the API:
>>
>> 1) Configuration
>>
>> Configuration keys like
>>
>> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
>>
>> are currently not supported in the configuration stack. The key space
>> should remain constant. Only a constant key space enables the use of the
>> ConfigOption class which is required in the layered configuration. For
>> now I would suggest to only allow a global setting for buffer capacity,
>> timeout, and retry-strategy. We can later work on a per-function
>> configuration (potentially also needed for other use cases).
>>
>> 2) Semantical declaration
>>
>> Regarding
>>
>> `table.exec.async-scalar.catalog.db.func-name.output-mode`
>>
>> this is a semantical property of a function and should be defined
>> per-function. It impacts the query result and potentially the behavior
>> of planner rules.
>>
>> I see two options for this either: (a) an additional method in
>> AsyncScalarFunction or (b) adding this to the function's requirements. I
>> vote for (b), because a FunctionDefinition should be fully self
>> contained and sufficient for planning.
>>
>> Thus, for `FunctionDefinition.getRequirements():
>> Set` we can add a new requirement `ORDERED` which
>> should also be the default for AsyncScalarFunction. `getRequirements()`
>> can be overwritten and return a set without this requirement if the user
>> intents to do this.
>>
>>
>> Thanks,
>> Timo
>>
>>
>>
>>
>> On 11.12.23 18:43, Piotr Nowojski wrote:
>> > +1 to the idea, I don't have any comments.
>> >
>> > Best,
>> > Piotrek
>> >
>> > czw., 7 gru 2023 o 07:15 Alan Sheinberg > .invalid>
>> > napisał(a):
>> >
>> >>>
>> >>> Nicely written and makes sense.  The only feedback I have is around the
>> >>> naming of the generalization, e.g. "Specifically,
>> PythonCalcSplitRuleBase
>> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
>> to
>> >>> imply/suggest that all Async functions are remote.  I wonder if we can
>> >> find
>> >>> another name which doesn't carry that connotation; maybe
>> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
>> Python
>> >>> and Async functions seems reasonable.)
>> >>>
>> >> Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
>> >> believe that the python calls are also done asynchronously, so that
>> might
>> >> be a reasonable name, so long as there's no confusion between the base
>> and
>> >> async child class.
>> >>
>> >> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes > >
>> >> wrote:
>> >>
>> >>> Hi Alan,
>> >>>
>> >>> Nicely written and makes sense.  The only feedback I have is around the
>> >>> naming of the generalization, e.g. "Specifically,
>> PythonCalcSplitRuleBase
>> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
>> to
>> >>> imply/suggest that all Async functions are remote.  I wonder if we can
>> >> find
>> >>> another name which doesn't carry that connotation; maybe
>> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
>> Python
>> >>> and Async fu