Thanks Piotr and Timo for your responses.

To address your comments Timo:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space
> should remain constant. Only a constant key space enables the use of the
> ConfigOption class which is required in the layered configuration. For
> now I would suggest to only allow a global setting for buffer capacity,
> timeout, and retry-strategy. We can later work on a per-function
> configuration (potentially also needed for other use cases).


Yeah, I was trying to find similar examples and couldn't find too many,
because as you say, they aren't supported.

There are things like the metric reporters
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/metric_reporters/>,
where you can make up a name (e.g. my_jmx_reporter), but then must list it
in metrics.reporters so that configs can all be iterated over.  Doing a
similar thing here would be a bit inelegant for this case.  I'm happy to
have a global setting and a future solution could override the global
setting once we figure that out.

2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined
> per-function. It impacts the query result and potentially the behavior
> of planner rules.

I see two options for this either: (a) an additional method in
> AsyncScalarFunction or (b) adding this to the function's requirements. I
> vote for (b), because a FunctionDefinition should be fully self
> contained and sufficient for planning.

Thus, for `FunctionDefinition.getRequirements():
> Set<FunctionRequirement>` we can add a new requirement `ORDERED` which
> should also be the default for AsyncScalarFunction. `getRequirements()`
> can be overwritten and return a set without this requirement if the user
> intents to do this.


That's a good point. Maybe if we had per-function configs this could make
sense, but it doesn't make as much when everything is global. The default
of each definition should be to get a correct result, but allowing a manual
override to say that performance is ultimately what they care about over
certain SQL order semantics is also useful.  If the user overrides
`requirements()` to omit the `ORDERED` requirement, do we allow the
operator to return out-of-order results or should it fall back on
`AsyncOutputMode.ALLOW_UNORDERED` type behavior (where we allow
out-of-order only if it's deemed correct)? Having a manual override to mean
out-of-order seems like a decent starting point and we could alway add
`FunctionRequirement.ALLOW_UNDERED` in the future to allow the more
sophisticated behavior.

Thanks,
Alan

On Thu, Dec 14, 2023 at 1:29 AM Timo Walther <twal...@apache.org> wrote:

> Hi Alan,
>
> thanks for proposing this FLIP. It's a great addition to Flink and has
> been requested multiple times. It will be in particular interesting for
> accessing REST endpoints and other remote services.
>
> Great that we can generalize and reuse parts of the Python planner rules
> and code for this.
>
> I have some feedback regarding the API:
>
> 1) Configuration
>
> Configuration keys like
>
> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
>
> are currently not supported in the configuration stack. The key space
> should remain constant. Only a constant key space enables the use of the
> ConfigOption class which is required in the layered configuration. For
> now I would suggest to only allow a global setting for buffer capacity,
> timeout, and retry-strategy. We can later work on a per-function
> configuration (potentially also needed for other use cases).
>
> 2) Semantical declaration
>
> Regarding
>
> `table.exec.async-scalar.catalog.db.func-name.output-mode`
>
> this is a semantical property of a function and should be defined
> per-function. It impacts the query result and potentially the behavior
> of planner rules.
>
> I see two options for this either: (a) an additional method in
> AsyncScalarFunction or (b) adding this to the function's requirements. I
> vote for (b), because a FunctionDefinition should be fully self
> contained and sufficient for planning.
>
> Thus, for `FunctionDefinition.getRequirements():
> Set<FunctionRequirement>` we can add a new requirement `ORDERED` which
> should also be the default for AsyncScalarFunction. `getRequirements()`
> can be overwritten and return a set without this requirement if the user
> intents to do this.
>
>
> Thanks,
> Timo
>
>
>
>
> On 11.12.23 18:43, Piotr Nowojski wrote:
> > +1 to the idea, I don't have any comments.
> >
> > Best,
> > Piotrek
> >
> > czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io
> .invalid>
> > napisał(a):
> >
> >>>
> >>> Nicely written and makes sense.  The only feedback I have is around the
> >>> naming of the generalization, e.g. "Specifically,
> PythonCalcSplitRuleBase
> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
> to
> >>> imply/suggest that all Async functions are remote.  I wonder if we can
> >> find
> >>> another name which doesn't carry that connotation; maybe
> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
> Python
> >>> and Async functions seems reasonable.)
> >>>
> >> Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
> >> believe that the python calls are also done asynchronously, so that
> might
> >> be a reasonable name, so long as there's no confusion between the base
> and
> >> async child class.
> >>
> >> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes <jhug...@confluent.io.invalid
> >
> >> wrote:
> >>
> >>> Hi Alan,
> >>>
> >>> Nicely written and makes sense.  The only feedback I have is around the
> >>> naming of the generalization, e.g. "Specifically,
> PythonCalcSplitRuleBase
> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
> to
> >>> imply/suggest that all Async functions are remote.  I wonder if we can
> >> find
> >>> another name which doesn't carry that connotation; maybe
> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
> Python
> >>> and Async functions seems reasonable.)
> >>>
> >>> Cheers,
> >>>
> >>> Jim
> >>>
> >>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
> >>> <asheinb...@confluent.io.invalid> wrote:
> >>>
> >>>> I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
> >>>> asynchronous scalar function support [1]
> >>>>
> >>>> This feature proposes adding a new UDF type AsyncScalarFunction which
> >> is
> >>>> invoked just like a normal ScalarFunction, but is implemented with an
> >>>> asynchronous eval method.  I had brought this up including the
> >> motivation
> >>>> in a previous discussion thread [2].
> >>>>
> >>>> The purpose is to achieve high throughput scalar function UDFs while
> >>>> allowing that an individual call may have high latency.  It allows
> >>> scaling
> >>>> up the parallelism of just these calls without having to increase the
> >>>> parallelism of the whole query (which could be rather resource
> >>>> inefficient).
> >>>>
> >>>> In practice, it should enable SQL integration with external services
> >> and
> >>>> systems, which Flink has limited support for at the moment. It should
> >>> also
> >>>> allow easier integration with existing libraries which use
> asynchronous
> >>>> APIs.
> >>>>
> >>>> Looking forward to your feedback and suggestions.
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> >>>> <
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> >>>>>
> >>>>
> >>>> [2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
> >>>> <https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
> >>>>
> >>>> Thanks,
> >>>> Alan
> >>>>
> >>>
> >>
> >
>
>

Reply via email to