Hi, Alan and Timo. Thanks for your reply.
>Would it make a difference if it were exposed by the explain
>method (the operator having "syncMode" vs not)?
@Alan: I think this is a good way to tell the user what mode these async udx 
are currently in.
>A regular SQL user doesn't care whether the function is sync or async. 
@Timo: I agree that the planner should throw as few exceptions as possible 
rather than confusing users. So I think 
it is a good way to expose syncMode through explain syntax.
> If the input to the operator is append-only, it seems fine, 
> because this implies that each row is effectively independent and ordering is 
> unimportant.


> For example, if the query is > an append-only `SELECT FUNC(c) FROM t`, 
> I don't see a reason why the > operator is not allowed to produce unordered 
> results.


@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as 
source and mysql as sink as an example. 
Although kafka is an append-only source, one of its fields is used as pk when 
writing to mysql. If async udx is executed
 in an unordered mode, there may be problems with the data in mysql in the end. 
In this case, we need to ensure that 
the sink-based pk is in order actually.



--

    Best!
    Xuyang





At 2023-12-16 03:33:55, "Alan Sheinberg" <asheinb...@confluent.io.INVALID> 
wrote:
>Thanks for the replies everyone.  My responses are inline:
>
>About the configs, what do you think using hints as mentioned in [1].
>
>@Aitozi: I think hints could be a good way to do this, similar to lookup
>joins or the proposal in FLIP-313.  One benefit of hints is that they allow
>for the highest granularity of configuration because you can decide at
>each and every call site just what parameters to use.  The downside of
>hints is that there's more syntax to learn and more verbosity.  I'm
>somewhat partial to a configuration like this with a class definition level
>of granularity (similar to how metrics reporters are defined [1]):
>
>table.exec.async-scalar.myfunc.class: org.apache.flink.MyAsyncScalarFunction
>table.exec.async-scalar.myfunc.buffer-capacity: 10
>...
>
>As Timo mentioned, the downside to this is that there's not a nice static
>way to do this at the moment, unless you extend ConfigOption.  It would be
>good ultimately if Lookup joins, async scalar functions, and other future
>configurable UDFs shared the same methodology, but maybe a unified approach
>is a followup discussion.
>
>I’m just curious why you don’t use conf(global) and query hint(individual
>> async udx) to mark the output
>> mode 'order' or 'unorder' like async look join [1] and async udtf[2], but
>> chose to introduce a new enum
>> in AsyncScalarFunction.
>
>
>@Xuyang: I'm open to adding hints. I think the important part is that we
>have some method for the user to have a class definition level way to
>define whether ORDERED or ALLOW_UNORDERED is most appropriate.  I don't
>have a strong sense yet for what would be most appropriately exposed as a
>FunctionRequirement vs a simple configuration/hint.
>
>What about throwing an exception to make it clear to users that using async
>> scalar functions in this situation
>> is problematic instead of executing silently in sync mode? Because users
>> may be confused about
>> the final actual job graph.
>
>
>@Xuyang: Would it make a difference if it were exposed by the explain
>method (the operator having "syncMode" vs not)?  I'd be fine to do it
>either way -- certainly throwing an error is a bit simpler.
>
>You are right. Actually it should be the planner that fully decides
>> whether ORDERED or UNORDERED is safe to do. For example, if the query is
>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
>> operator is not allowed to produce unordered results. By global
>> configuration, we can set ORDERED such that users don't get confused
>> about the unordered output.
>
>
>@Timo: Is there an easy way to determine if the output of an async function
>would be problematic or not?  If the input to the operator is append-only,
>it seems fine, because this implies that each row is effectively
>independent and ordering is unimportant. For upsert mode with +U rows, you
>wouldn't want to swap order with other +U rows for the same key because the
>last one should win.  For -D or -U rows, you wouldn't want to swap with
>other rows for the same key for similar reasons.  Is it as simple as
>looking for the changlelog mode to determine whether it's safe to run async
>functions UNORDERED?  I had considered analyzing various query forms (join
>vs aggregation vs whatever), but it seems like changelog mode could be
>sufficient to understand what works and what would be an issue.  Any code
>pointers and explanation for similar analysis would be great to understand
>this more.
>
>The mode UNORDERED however should only have
>> effect for these simply use cases and throw an exception if UNORDERED
>> would mess up a changelog or other subsequent operators.
>
>@Timo: Should we throw errors or run in sync mode?  It seems like running
>in sync mode is an option to ensure correctness in all changelog modes.
>
>Let's go with global configuration first and later introduce
>> hints. I feel the more hints we introduce, the harder SQL queries get
>> when maintaining them.
>
>@Timo: That seems like a reasonable approach to me.
>
>-Alan
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
>
>On Fri, Dec 15, 2023 at 2:56 AM Timo Walther <twal...@apache.org> wrote:
>
>> 1. Override the function `getRequirements` in `AsyncScalarFunction`
>>
>>  > If the user overrides `requirements()` to omit the `ORDERED`
>>  > requirement, do we allow the operator to return out-of-order results
>>  > or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
>>  > behavior (where we allow out-of-order only if it's deemed correct)?
>>
>> You are right. Actually it should be the planner that fully decides
>> whether ORDERED or UNORDERED is safe to do. For example, if the query is
>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
>> operator is not allowed to produce unordered results. By global
>> configuration, we can set ORDERED such that users don't get confused
>> about the unordered output. The mode UNORDERED however should only have
>> effect for these simply use cases and throw an exception if UNORDERED
>> would mess up a changelog or other subsequent operators.
>>
>> 2. In some scenarios with semantic correctness, async operators must be
>> executed in sync mode.
>>
>>  > What about throwing an exception to make it clear to users that using
>> async scalar functions
>>
>> @Xuyang: A regular SQL user doesn't care whether the function is sync or
>> async. The planner should simply give its best to make the execution
>> performant. I would not throw an exception here. There more exceptions
>> the, the more struggles and questions from the user. Conceptually, we
>> can run async code also sync, and that's why we should also do it to
>> avoid errors.
>>
>> 3. Hints
>>
>> @Aitozi: Let's go with global configuration first and later introduce
>> hints. I feel the more hints we introduce, the harder SQL queries get
>> when maintaining them.
>>
>> Regards,
>> Timo
>>
>>
>>
>>
>> On 15.12.23 04:51, Xuyang wrote:
>> > Hi, Alan. Thanks for driving this.
>> >
>> >
>> > Using async to improve throughput has been done on look join, and the
>> improvement
>> > effect is obvious, so I think it makes sense to support async scalar
>> function.  Big +1 for this flip.
>> > I have some questions below.
>> >
>> >
>> > 1. Override the function `getRequirements` in `AsyncScalarFunction`
>> >
>> >
>> > I’m just curious why you don’t use conf(global) and query
>> hint(individual async udx) to mark the output
>> > mode 'order' or 'unorder' like async look join [1] and async udtf[2],
>> but chose to introduce a new enum
>> > in AsyncScalarFunction.
>> >
>> >
>> > 2. In some scenarios with semantic correctness, async operators must be
>> executed in sync mode.
>> >
>> >
>> > What about throwing an exception to make it clear to users that using
>> async scalar functions in this situation
>> > is problematic instead of executing silently in sync mode? Because users
>> may be confused about
>> > the final actual job graph.
>> >
>> >
>> > [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
>> > [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> >
>> >      Best!
>> >      Xuyang
>> >
>> >
>> >
>> >
>> >
>> > 在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
>> >> Hi Alan,
>> >>     Nice FLIP, I also explore leveraging the async table function[1] to
>> >> improve the throughput before.
>> >>
>> >> About the configs, what do you think using hints as mentioned in [1].
>> >>
>> >> [1]:
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
>> >>
>> >> Thanks,
>> >> Aitozi.
>> >>
>> >> Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:
>> >>
>> >>> Hi Alan,
>> >>>
>> >>> thanks for proposing this FLIP. It's a great addition to Flink and has
>> >>> been requested multiple times. It will be in particular interesting for
>> >>> accessing REST endpoints and other remote services.
>> >>>
>> >>> Great that we can generalize and reuse parts of the Python planner
>> rules
>> >>> and code for this.
>> >>>
>> >>> I have some feedback regarding the API:
>> >>>
>> >>> 1) Configuration
>> >>>
>> >>> Configuration keys like
>> >>>
>> >>> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
>> >>>
>> >>> are currently not supported in the configuration stack. The key space
>> >>> should remain constant. Only a constant key space enables the use of
>> the
>> >>> ConfigOption class which is required in the layered configuration. For
>> >>> now I would suggest to only allow a global setting for buffer capacity,
>> >>> timeout, and retry-strategy. We can later work on a per-function
>> >>> configuration (potentially also needed for other use cases).
>> >>>
>> >>> 2) Semantical declaration
>> >>>
>> >>> Regarding
>> >>>
>> >>> `table.exec.async-scalar.catalog.db.func-name.output-mode`
>> >>>
>> >>> this is a semantical property of a function and should be defined
>> >>> per-function. It impacts the query result and potentially the behavior
>> >>> of planner rules.
>> >>>
>> >>> I see two options for this either: (a) an additional method in
>> >>> AsyncScalarFunction or (b) adding this to the function's requirements.
>> I
>> >>> vote for (b), because a FunctionDefinition should be fully self
>> >>> contained and sufficient for planning.
>> >>>
>> >>> Thus, for `FunctionDefinition.getRequirements():
>> >>> Set<FunctionRequirement>` we can add a new requirement `ORDERED` which
>> >>> should also be the default for AsyncScalarFunction. `getRequirements()`
>> >>> can be overwritten and return a set without this requirement if the
>> user
>> >>> intents to do this.
>> >>>
>> >>>
>> >>> Thanks,
>> >>> Timo
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On 11.12.23 18:43, Piotr Nowojski wrote:
>> >>>> +1 to the idea, I don't have any comments.
>> >>>>
>> >>>> Best,
>> >>>> Piotrek
>> >>>>
>> >>>> czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io
>> >>> .invalid>
>> >>>> napisał(a):
>> >>>>
>> >>>>>>
>> >>>>>> Nicely written and makes sense.  The only feedback I have is around
>> the
>> >>>>>> naming of the generalization, e.g. "Specifically,
>> >>> PythonCalcSplitRuleBase
>> >>>>>> will be generalized into RemoteCalcSplitRuleBase."  This naming
>> seems
>> >>> to
>> >>>>>> imply/suggest that all Async functions are remote.  I wonder if we
>> can
>> >>>>> find
>> >>>>>> another name which doesn't carry that connotation; maybe
>> >>>>>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
>> >>> Python
>> >>>>>> and Async functions seems reasonable.)
>> >>>>>>
>> >>>>> Thanks.  That's fair.  I agree that "Remote" isn't always accurate.
>> I
>> >>>>> believe that the python calls are also done asynchronously, so that
>> >>> might
>> >>>>> be a reasonable name, so long as there's no confusion between the
>> base
>> >>> and
>> >>>>> async child class.
>> >>>>>
>> >>>>> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes
>> <jhug...@confluent.io.invalid
>> >>>>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hi Alan,
>> >>>>>>
>> >>>>>> Nicely written and makes sense.  The only feedback I have is around
>> the
>> >>>>>> naming of the generalization, e.g. "Specifically,
>> >>> PythonCalcSplitRuleBase
>> >>>>>> will be generalized into RemoteCalcSplitRuleBase."  This naming
>> seems
>> >>> to
>> >>>>>> imply/suggest that all Async functions are remote.  I wonder if we
>> can
>> >>>>> find
>> >>>>>> another name which doesn't carry that connotation; maybe
>> >>>>>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
>> >>> Python
>> >>>>>> and Async functions seems reasonable.)
>> >>>>>>
>> >>>>>> Cheers,
>> >>>>>>
>> >>>>>> Jim
>> >>>>>>
>> >>>>>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
>> >>>>>> <asheinb...@confluent.io.invalid> wrote:
>> >>>>>>
>> >>>>>>> I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
>> >>>>>>> asynchronous scalar function support [1]
>> >>>>>>>
>> >>>>>>> This feature proposes adding a new UDF type AsyncScalarFunction
>> which
>> >>>>> is
>> >>>>>>> invoked just like a normal ScalarFunction, but is implemented with
>> an
>> >>>>>>> asynchronous eval method.  I had brought this up including the
>> >>>>> motivation
>> >>>>>>> in a previous discussion thread [2].
>> >>>>>>>
>> >>>>>>> The purpose is to achieve high throughput scalar function UDFs
>> while
>> >>>>>>> allowing that an individual call may have high latency.  It allows
>> >>>>>> scaling
>> >>>>>>> up the parallelism of just these calls without having to increase
>> the
>> >>>>>>> parallelism of the whole query (which could be rather resource
>> >>>>>>> inefficient).
>> >>>>>>>
>> >>>>>>> In practice, it should enable SQL integration with external
>> services
>> >>>>> and
>> >>>>>>> systems, which Flink has limited support for at the moment. It
>> should
>> >>>>>> also
>> >>>>>>> allow easier integration with existing libraries which use
>> >>> asynchronous
>> >>>>>>> APIs.
>> >>>>>>>
>> >>>>>>> Looking forward to your feedback and suggestions.
>> >>>>>>>
>> >>>>>>> [1]
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
>> >>>>>>> <
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>> [2]
>> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
>> >>>>>>> <https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Alan
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>>
>>

Reply via email to