Hi Alan,
thanks for proposing this FLIP. It's a great addition to Flink and has
been requested multiple times. It will be in particular interesting for
accessing REST endpoints and other remote services.
Great that we can generalize and reuse parts of the Python planner rules
and code for this.
I have some feedback regarding the API:
1) Configuration
Configuration keys like
`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
are currently not supported in the configuration stack. The key space
should remain constant. Only a constant key space enables the use of the
ConfigOption class which is required in the layered configuration. For
now I would suggest to only allow a global setting for buffer capacity,
timeout, and retry-strategy. We can later work on a per-function
configuration (potentially also needed for other use cases).
2) Semantical declaration
Regarding
`table.exec.async-scalar.catalog.db.func-name.output-mode`
this is a semantical property of a function and should be defined
per-function. It impacts the query result and potentially the behavior
of planner rules.
I see two options for this either: (a) an additional method in
AsyncScalarFunction or (b) adding this to the function's requirements. I
vote for (b), because a FunctionDefinition should be fully self
contained and sufficient for planning.
Thus, for `FunctionDefinition.getRequirements():
Set<FunctionRequirement>` we can add a new requirement `ORDERED` which
should also be the default for AsyncScalarFunction. `getRequirements()`
can be overwritten and return a set without this requirement if the user
intents to do this.
Thanks,
Timo
On 11.12.23 18:43, Piotr Nowojski wrote:
+1 to the idea, I don't have any comments.
Best,
Piotrek
czw., 7 gru 2023 o 07:15 Alan Sheinberg <asheinb...@confluent.io.invalid>
napisał(a):
Nicely written and makes sense. The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase." This naming seems to
imply/suggest that all Async functions are remote. I wonder if we can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)
Thanks. That's fair. I agree that "Remote" isn't always accurate. I
believe that the python calls are also done asynchronously, so that might
be a reasonable name, so long as there's no confusion between the base and
async child class.
On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes <jhug...@confluent.io.invalid>
wrote:
Hi Alan,
Nicely written and makes sense. The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase." This naming seems to
imply/suggest that all Async functions are remote. I wonder if we can
find
another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase. (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)
Cheers,
Jim
On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
<asheinb...@confluent.io.invalid> wrote:
I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
asynchronous scalar function support [1]
This feature proposes adding a new UDF type AsyncScalarFunction which
is
invoked just like a normal ScalarFunction, but is implemented with an
asynchronous eval method. I had brought this up including the
motivation
in a previous discussion thread [2].
The purpose is to achieve high throughput scalar function UDFs while
allowing that an individual call may have high latency. It allows
scaling
up the parallelism of just these calls without having to increase the
parallelism of the whole query (which could be rather resource
inefficient).
In practice, it should enable SQL integration with external services
and
systems, which Flink has limited support for at the moment. It should
also
allow easier integration with existing libraries which use asynchronous
APIs.
Looking forward to your feedback and suggestions.
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
[2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs
<https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs>
Thanks,
Alan