Thanks for the helpful comments, Xuyang and Timo.

@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as
> source and mysql as sink as an example.
> Although kafka is an append-only source, one of its fields is used as pk
> when writing to mysql. If async udx is executed
>  in an unordered mode, there may be problems with the data in mysql in the
> end. In this case, we need to ensure that
> the sink-based pk is in order actually.


@Xuyang: That's a great point.  If some node downstream of my operator
cares about ordering, there's no way for it to reconstruct the original
ordering of the rows as they were input to my operator.  So even if they
want to preserve ordering by key, the order in which they see it may
already be incorrect.  Somehow I thought that maybe the analysis of the
changelog mode at a given operator was aware of downstream operations, but
it seems not.

Clear "no" on this. Changelog semantics make the planner complex and we
> need to be careful. Therefore I would strongly suggest we introduce
> ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
> plans with appropriate planner rules that guard it.


@Timo: The better I understand the complexity, the more I agree with this.
I would be totally fine with the first version only having ORDERED mode.
For a v2, we could attempt to do the next most conservative thing and only
allow UNORDERED when the whole graph is in *INSERT *changelog mode.  The
next best type of optimization might understand what's the key required
downstream, and allow breaking the original order only between unrelated
keys, but maintaining it between rows of the same key.  Of course if the
key used downstream is computed in some manner, that makes it all the
harder to know this beforehand.

So unordering should be fine *within* watermarks. This is also what
> watermarks are good for, a trade-off between strict ordering and making
> progress. The async operator from DataStream API also supports this if I
> remember correctly. However, it assumes a timestamp is present in
> StreamRecord on which it can work. But this is not the case within the
> SQL engine.


*AsyncWaitOperator* and *UnorderedStreamElementQueue* (the implementations
I plan on using) seem to support exactly this behavior.  I don't think it
makes assumptions about the record's timestamp, but just preserves whatever
the input order is w.r.t watermarks.  I'd be curious to understand the
timestamp use in more detail and see if it's required with the mentioned
classes.

TLDR: Let's focus on ORDERED first.


I'm more than happy to start here and we can consider UNORDERED as a
followup.  Then maybe we consider only INSERT mode graphs and ones where we
can solve the watermark constraints.

Thanks,
Alan


On Mon, Dec 18, 2023 at 2:36 AM Timo Walther <twal...@apache.org> wrote:

> Hi Xuyang and Alan,
>
> thanks for this productive discussion.
>
>  > Would it make a difference if it were exposed by the explain
>
> @Alan: I think this is great idea. +1 on exposing the sync/async
> behavior thought EXPLAIN.
>
>
>  > Is there an easy way to determine if the output of an async function
>  > would be problematic or not?
>
> Clear "no" on this. Changelog semantics make the planner complex and we
> need to be careful. Therefore I would strongly suggest we introduce
> ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
> plans with appropriate planner rules that guard it.
>
>  > If the input to the operator is append-only, it seems fine, because
>  > this implies that each row is effectively independent and ordering is
>  > unimportant.
>
> As @Xuyang pointed out, it's not only the input that decides whether
> append-only is safe. It's also the subsequent operators in the pipeline.
> The example of Xuyang is a good one, when the sink operates in upsert
> mode. Append-only source, append-only operators, and append-only sink
> are safer.
>
> However, even in this combination, a row is not fully "independent"
> there are still watermarks flowing between rows:
>
> R(5), W(4), R(3), R(4), R(2), R(1), W(0)
>
> So unordering should be fine *within* watermarks. This is also what
> watermarks are good for, a trade-off between strict ordering and making
> progress. The async operator from DataStream API also supports this if I
> remember correctly. However, it assumes a timestamp is present in
> StreamRecord on which it can work. But this is not the case within the
> SQL engine.
>
> TLDR: Let's focus on ORDERED first.
>
> If we want to use UNORDERED, I would suggest to check the input operator
> for exactly 1 time attribute column. If there is exactly 1 time
> attribute column, we could insert it into the StreamRecord and allow
> UNORDERED mode. If this condition is not met, we go with ORDERED.
>
> Regards,
> Timo
>
>
>
>
> On 18.12.23 07:05, Xuyang wrote:
> > Hi, Alan and Timo. Thanks for your reply.
> >> Would it make a difference if it were exposed by the explain
> >> method (the operator having "syncMode" vs not)?
> > @Alan: I think this is a good way to tell the user what mode these async
> udx are currently in.
> >> A regular SQL user doesn't care whether the function is sync or async.
> > @Timo: I agree that the planner should throw as few exceptions as
> possible rather than confusing users. So I think
> > it is a good way to expose syncMode through explain syntax.
> >> If the input to the operator is append-only, it seems fine,
> >> because this implies that each row is effectively independent and
> ordering is unimportant.
> >
> >
> >> For example, if the query is > an append-only `SELECT FUNC(c) FROM t`,
> >> I don't see a reason why the > operator is not allowed to produce
> unordered results.
> >
> >
> > @Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka
> as source and mysql as sink as an example.
> > Although kafka is an append-only source, one of its fields is used as pk
> when writing to mysql. If async udx is executed
> >   in an unordered mode, there may be problems with the data in mysql in
> the end. In this case, we need to ensure that
> > the sink-based pk is in order actually.
> >
> >
> >
> > --
> >
> >      Best!
> >      Xuyang
> >
> >
> >
> >
> >
> > At 2023-12-16 03:33:55, "Alan Sheinberg" <asheinb...@confluent.io.INVALID>
> wrote:
> >> Thanks for the replies everyone.  My responses are inline:
> >>
> >> About the configs, what do you think using hints as mentioned in [1].
> >>
> >> @Aitozi: I think hints could be a good way to do this, similar to lookup
> >> joins or the proposal in FLIP-313.  One benefit of hints is that they
> allow
> >> for the highest granularity of configuration because you can decide at
> >> each and every call site just what parameters to use.  The downside of
> >> hints is that there's more syntax to learn and more verbosity.  I'm
> >> somewhat partial to a configuration like this with a class definition
> level
> >> of granularity (similar to how metrics reporters are defined [1]):
> >>
> >> table.exec.async-scalar.myfunc.class:
> org.apache.flink.MyAsyncScalarFunction
> >> table.exec.async-scalar.myfunc.buffer-capacity: 10
> >> ...
> >>
> >> As Timo mentioned, the downside to this is that there's not a nice
> static
> >> way to do this at the moment, unless you extend ConfigOption.  It would
> be
> >> good ultimately if Lookup joins, async scalar functions, and other
> future
> >> configurable UDFs shared the same methodology, but maybe a unified
> approach
> >> is a followup discussion.
> >>
> >> I’m just curious why you don’t use conf(global) and query
> hint(individual
> >>> async udx) to mark the output
> >>> mode 'order' or 'unorder' like async look join [1] and async udtf[2],
> but
> >>> chose to introduce a new enum
> >>> in AsyncScalarFunction.
> >>
> >>
> >> @Xuyang: I'm open to adding hints. I think the important part is that we
> >> have some method for the user to have a class definition level way to
> >> define whether ORDERED or ALLOW_UNORDERED is most appropriate.  I don't
> >> have a strong sense yet for what would be most appropriately exposed as
> a
> >> FunctionRequirement vs a simple configuration/hint.
> >>
> >> What about throwing an exception to make it clear to users that using
> async
> >>> scalar functions in this situation
> >>> is problematic instead of executing silently in sync mode? Because
> users
> >>> may be confused about
> >>> the final actual job graph.
> >>
> >>
> >> @Xuyang: Would it make a difference if it were exposed by the explain
> >> method (the operator having "syncMode" vs not)?  I'd be fine to do it
> >> either way -- certainly throwing an error is a bit simpler.
> >>
> >> You are right. Actually it should be the planner that fully decides
> >>> whether ORDERED or UNORDERED is safe to do. For example, if the query
> is
> >>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
> >>> operator is not allowed to produce unordered results. By global
> >>> configuration, we can set ORDERED such that users don't get confused
> >>> about the unordered output.
> >>
> >>
> >> @Timo: Is there an easy way to determine if the output of an async
> function
> >> would be problematic or not?  If the input to the operator is
> append-only,
> >> it seems fine, because this implies that each row is effectively
> >> independent and ordering is unimportant. For upsert mode with +U rows,
> you
> >> wouldn't want to swap order with other +U rows for the same key because
> the
> >> last one should win.  For -D or -U rows, you wouldn't want to swap with
> >> other rows for the same key for similar reasons.  Is it as simple as
> >> looking for the changlelog mode to determine whether it's safe to run
> async
> >> functions UNORDERED?  I had considered analyzing various query forms
> (join
> >> vs aggregation vs whatever), but it seems like changelog mode could be
> >> sufficient to understand what works and what would be an issue.  Any
> code
> >> pointers and explanation for similar analysis would be great to
> understand
> >> this more.
> >>
> >> The mode UNORDERED however should only have
> >>> effect for these simply use cases and throw an exception if UNORDERED
> >>> would mess up a changelog or other subsequent operators.
> >>
> >> @Timo: Should we throw errors or run in sync mode?  It seems like
> running
> >> in sync mode is an option to ensure correctness in all changelog modes.
> >>
> >> Let's go with global configuration first and later introduce
> >>> hints. I feel the more hints we introduce, the harder SQL queries get
> >>> when maintaining them.
> >>
> >> @Timo: That seems like a reasonable approach to me.
> >>
> >> -Alan
> >>
> >> [1]
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
> >>
> >> On Fri, Dec 15, 2023 at 2:56 AM Timo Walther <twal...@apache.org>
> wrote:
> >>
> >>> 1. Override the function `getRequirements` in `AsyncScalarFunction`
> >>>
> >>>   > If the user overrides `requirements()` to omit the `ORDERED`
> >>>   > requirement, do we allow the operator to return out-of-order
> results
> >>>   > or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
> >>>   > behavior (where we allow out-of-order only if it's deemed correct)?
> >>>
> >>> You are right. Actually it should be the planner that fully decides
> >>> whether ORDERED or UNORDERED is safe to do. For example, if the query
> is
> >>> an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
> >>> operator is not allowed to produce unordered results. By global
> >>> configuration, we can set ORDERED such that users don't get confused
> >>> about the unordered output. The mode UNORDERED however should only have
> >>> effect for these simply use cases and throw an exception if UNORDERED
> >>> would mess up a changelog or other subsequent operators.
> >>>
> >>> 2. In some scenarios with semantic correctness, async operators must be
> >>> executed in sync mode.
> >>>
> >>>   > What about throwing an exception to make it clear to users that
> using
> >>> async scalar functions
> >>>
> >>> @Xuyang: A regular SQL user doesn't care whether the function is sync
> or
> >>> async. The planner should simply give its best to make the execution
> >>> performant. I would not throw an exception here. There more exceptions
> >>> the, the more struggles and questions from the user. Conceptually, we
> >>> can run async code also sync, and that's why we should also do it to
> >>> avoid errors.
> >>>
> >>> 3. Hints
> >>>
> >>> @Aitozi: Let's go with global configuration first and later introduce
> >>> hints. I feel the more hints we introduce, the harder SQL queries get
> >>> when maintaining them.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>>
> >>>
> >>> On 15.12.23 04:51, Xuyang wrote:
> >>>> Hi, Alan. Thanks for driving this.
> >>>>
> >>>>
> >>>> Using async to improve throughput has been done on look join, and the
> >>> improvement
> >>>> effect is obvious, so I think it makes sense to support async scalar
> >>> function.  Big +1 for this flip.
> >>>> I have some questions below.
> >>>>
> >>>>
> >>>> 1. Override the function `getRequirements` in `AsyncScalarFunction`
> >>>>
> >>>>
> >>>> I’m just curious why you don’t use conf(global) and query
> >>> hint(individual async udx) to mark the output
> >>>> mode 'order' or 'unorder' like async look join [1] and async udtf[2],
> >>> but chose to introduce a new enum
> >>>> in AsyncScalarFunction.
> >>>>
> >>>>
> >>>> 2. In some scenarios with semantic correctness, async operators must
> be
> >>> executed in sync mode.
> >>>>
> >>>>
> >>>> What about throwing an exception to make it clear to users that using
> >>> async scalar functions in this situation
> >>>> is problematic instead of executing silently in sync mode? Because
> users
> >>> may be confused about
> >>>> the final actual job graph.
> >>>>
> >>>>
> >>>> [1]
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
> >>>> [2]
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>>
> >>>>       Best!
> >>>>       Xuyang
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> 在 2023-12-15 11:20:24,"Aitozi" <gjying1...@gmail.com> 写道:
> >>>>> Hi Alan,
> >>>>>      Nice FLIP, I also explore leveraging the async table
> function[1] to
> >>>>> improve the throughput before.
> >>>>>
> >>>>> About the configs, what do you think using hints as mentioned in [1].
> >>>>>
> >>>>> [1]:
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
> >>>>>
> >>>>> Thanks,
> >>>>> Aitozi.
> >>>>>
> >>>>> Timo Walther <twal...@apache.org> 于2023年12月14日周四 17:29写道:
> >>>>>
> >>>>>> Hi Alan,
> >>>>>>
> >>>>>> thanks for proposing this FLIP. It's a great addition to Flink and
> has
> >>>>>> been requested multiple times. It will be in particular interesting
> for
> >>>>>> accessing REST endpoints and other remote services.
> >>>>>>
> >>>>>> Great that we can generalize and reuse parts of the Python planner
> >>> rules
> >>>>>> and code for this.
> >>>>>>
> >>>>>> I have some feedback regarding the API:
> >>>>>>
> >>>>>> 1) Configuration
> >>>>>>
> >>>>>> Configuration keys like
> >>>>>>
> >>>>>> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
> >>>>>>
> >>>>>> are currently not supported in the configuration stack. The key
> space
> >>>>>> should remain constant. Only a constant key space enables the use of
> >>> the
> >>>>>> ConfigOption class which is required in the layered configuration.
> For
> >>>>>> now I would suggest to only allow a global setting for buffer
> capacity,
> >>>>>> timeout, and retry-strategy. We can later work on a per-function
> >>>>>> configuration (potentially also needed for other use cases).
> >>>>>>
> >>>>>> 2) Semantical declaration
> >>>>>>
> >>>>>> Regarding
> >>>>>>
> >>>>>> `table.exec.async-scalar.catalog.db.func-name.output-mode`
> >>>>>>
> >>>>>> this is a semantical property of a function and should be defined
> >>>>>> per-function. It impacts the query result and potentially the
> behavior
> >>>>>> of planner rules.
> >>>>>>
> >>>>>> I see two options for this either: (a) an additional method in
> >>>>>> AsyncScalarFunction or (b) adding this to the function's
> requirements.
> >>> I
> >>>>>> vote for (b), because a FunctionDefinition should be fully self
> >>>>>> contained and sufficient for planning.
> >>>>>>
> >>>>>> Thus, for `FunctionDefinition.getRequirements():
> >>>>>> Set<FunctionRequirement>` we can add a new requirement `ORDERED`
> which
> >>>>>> should also be the default for AsyncScalarFunction.
> `getRequirements()`
> >>>>>> can be overwritten and return a set without this requirement if the
> >>> user
> >>>>>> intents to do this.
> >>>>>>
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 11.12.23 18:43, Piotr Nowojski wrote:
> >>>>>>> +1 to the idea, I don't have any comments.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Piotrek
> >>>>>>>
> >>>>>>> czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io
> >>>>>> .invalid>
> >>>>>>> napisał(a):
> >>>>>>>
> >>>>>>>>>
> >>>>>>>>> Nicely written and makes sense.  The only feedback I have is
> around
> >>> the
> >>>>>>>>> naming of the generalization, e.g. "Specifically,
> >>>>>> PythonCalcSplitRuleBase
> >>>>>>>>> will be generalized into RemoteCalcSplitRuleBase."  This naming
> >>> seems
> >>>>>> to
> >>>>>>>>> imply/suggest that all Async functions are remote.  I wonder if
> we
> >>> can
> >>>>>>>> find
> >>>>>>>>> another name which doesn't carry that connotation; maybe
> >>>>>>>>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
> >>>>>> Python
> >>>>>>>>> and Async functions seems reasonable.)
> >>>>>>>>>
> >>>>>>>> Thanks.  That's fair.  I agree that "Remote" isn't always
> accurate.
> >>> I
> >>>>>>>> believe that the python calls are also done asynchronously, so
> that
> >>>>>> might
> >>>>>>>> be a reasonable name, so long as there's no confusion between the
> >>> base
> >>>>>> and
> >>>>>>>> async child class.
> >>>>>>>>
> >>>>>>>> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes
> >>> <jhug...@confluent.io.invalid
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Alan,
> >>>>>>>>>
> >>>>>>>>> Nicely written and makes sense.  The only feedback I have is
> around
> >>> the
> >>>>>>>>> naming of the generalization, e.g. "Specifically,
> >>>>>> PythonCalcSplitRuleBase
> >>>>>>>>> will be generalized into RemoteCalcSplitRuleBase."  This naming
> >>> seems
> >>>>>> to
> >>>>>>>>> imply/suggest that all Async functions are remote.  I wonder if
> we
> >>> can
> >>>>>>>> find
> >>>>>>>>> another name which doesn't carry that connotation; maybe
> >>>>>>>>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
> >>>>>> Python
> >>>>>>>>> and Async functions seems reasonable.)
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Jim
> >>>>>>>>>
> >>>>>>>>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
> >>>>>>>>> <asheinb...@confluent.io.invalid> wrote:
> >>>>>>>>>
> >>>>>>>>>> I'd like to start a discussion of FLIP-400: AsyncScalarFunction
> for
> >>>>>>>>>> asynchronous scalar function support [1]
> >>>>>>>>>>
> >>>>>>>>>> This feature proposes adding a new UDF type AsyncScalarFunction
> >>> which
> >>>>>>>> is
> >>>>>>>>>> invoked just like a normal ScalarFunction, but is implemented
> with
> >>> an
> >>>>>>>>>> asynchronous eval method.  I had brought this up including the
> >>>>>>>> motivation
> >>>>>>>>>> in a previous discussion thread [2].
> >>>>>>>>>>
> >>>>>>>>>> The purpose is to achieve high throughput scalar function UDFs
> >>> while
> >>>>>>>>>> allowing that an individual call may have high latency.  It
> allows
> >>>>>>>>> scaling
> >>>>>>>>>> up the parallelism of just these calls without having to
> increase
> >>> the
> >>>>>>>>>> parallelism of the whole query (which could be rather resource
> >>>>>>>>>> inefficient).
> >>>>>>>>>>
> >>>>>>>>>> In practice, it should enable SQL integration with external
> >>> services
> >>>>>>>> and
> >>>>>>>>>> systems, which Flink has limited support for at the moment. It
> >>> should
> >>>>>>>>> also
> >>>>>>>>>> allow easier integration with existing libraries which use
> >>>>>> asynchronous
> >>>>>>>>>> APIs.
> >>>>>>>>>>
> >>>>>>>>>> Looking forward to your feedback and suggestions.
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> [2]
> >>> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
> >>>>>>>>>> <
> https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Alan
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>
> >>>
>
>

Reply via email to