Hi Mason,
    Thanks for your input. I think if we support the user defined async
table function,
user will be able to use it to hold a batch data then handle it at one time
in the customized function.

AsyncSink is meant for the sink operator. I have not figure out how to
integrate in this case.

Thanks,
Atiozi.


Mason Chen <mas.chen6...@gmail.com> 于2023年6月8日周四 12:40写道:

> Hi Aitozi,
>
> I think it makes sense to make it easier for SQL users to make RPCs. Do you
> think your proposal can extend to the ability to batch data for the RPC?
> This is also another common strategy to increase throughput. Also, have you
> considered solving this a bit differently by leveraging Flink's AsyncSink?
>
> Best,
> Mason
>
> On Mon, Jun 5, 2023 at 1:50 AM Aitozi <gjying1...@gmail.com> wrote:
>
> > One more thing for discussion:
> >
> > In our internal implementation, we reuse the option
> > `table.exec.async-lookup.buffer-capacity` and
> > `table.exec.async-lookup.timeout` to config
> > the async udtf. Do you think we should add two extra option to
> distinguish
> > from the lookup option such as
> >
> > `table.exec.async-udtf.buffer-capacity`
> > `table.exec.async-udtf.timeout`
> >
> >
> > Best,
> > Aitozi.
> >
> >
> >
> > Aitozi <gjying1...@gmail.com> 于2023年6月5日周一 12:20写道:
> >
> > > Hi Jing,
> > >
> > >     > what is the difference between the RPC call or query you
> mentioned
> > > and the lookup in a very
> > > general way
> > >
> > > I think the RPC call or query service is quite similar to the lookup
> > join.
> > > But lookup join should work
> > > with `LookupTableSource`.
> > >
> > > Let's see how we can perform an async RPC call with lookup join:
> > >
> > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > (2) Implement a `LookupTableSource` connector run with the async udtf
> > > defined in (1).
> > > (3) Then define a DDL of this look up table in SQL
> > >
> > > CREATE TEMPORARY TABLE Customers (
> > >   id INT,
> > >   name STRING,
> > >   country STRING,
> > >   zip STRING
> > > ) WITH (
> > >   'connector' = 'custom'
> > > );
> > >
> > > (4) Run with the query as below:
> > >
> > > SELECT o.order_id, o.total, c.country, c.zip
> > > FROM Orders AS o
> > >   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > >     ON o.customer_id = c.id;
> > >
> > > This example is from doc
> > > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join
> > >.You
> > > can image the look up process as an async RPC call process.
> > >
> > > Let's see how we can perform an async RPC call with lateral join:
> > >
> > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > (2) Run query with
> > >
> > > Create function f1 as '...' ;
> > >
> > > SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral table
> > > (f1(order_id)) as T(...);
> > >
> > > As you can see, the lateral join version is more simple and intuitive
> to
> > > users. Users do not have to wrap a
> > > LookupTableSource for the purpose of using async udtf.
> > >
> > > In the end, We can also see the user defined async table function is an
> > > enhancement of the current lateral table join
> > > which only supports sync lateral join now.
> > >
> > > Best,
> > > Aitozi.
> > >
> > >
> > > Jing Ge <j...@ververica.com.invalid> 于2023年6月2日周五 19:37写道:
> > >
> > >> Hi Aitozi,
> > >>
> > >> Thanks for the update. Just out of curiosity, what is the difference
> > >> between the RPC call or query you mentioned and the lookup in a very
> > >> general way? Since Lateral join is used in the FLIP. Is there any
> > special
> > >> thought for that? Sorry for asking so many questions. The FLIP
> contains
> > >> limited information to understand the motivation.
> > >>
> > >> Best regards,
> > >> Jing
> > >>
> > >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi <gjying1...@gmail.com> wrote:
> > >>
> > >> > Hi Jing,
> > >> >     I have updated the proposed changes to the FLIP. IMO, lookup has
> > its
> > >> > clear
> > >> > async call requirement is due to its IO heavy operator. In our
> usage,
> > >> sql
> > >> > users have
> > >> > logic to do some RPC call or query the third-party service which is
> > >> also IO
> > >> > intensive.
> > >> > In these case, we'd like to leverage the async function to improve
> the
> > >> > throughput.
> > >> >
> > >> > Thanks,
> > >> > Aitozi.
> > >> >
> > >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月1日周四 22:55写道:
> > >> >
> > >> > > Hi Aitozi,
> > >> > >
> > >> > > Sorry for the late reply. Would you like to update the proposed
> > >> changes
> > >> > > with more details into the FLIP too?
> > >> > > I got your point. It looks like a rational idea. However, since
> > lookup
> > >> > has
> > >> > > its clear async call requirement, are there any real use cases
> that
> > >> > > need this change? This will help us understand the motivation.
> After
> > >> all,
> > >> > > lateral join and temporal lookup join[1] are quite different.
> > >> > >
> > >> > > Best regards,
> > >> > > Jing
> > >> > >
> > >> > >
> > >> > > [1]
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
> > >> > >
> > >> > > On Wed, May 31, 2023 at 8:53 AM Aitozi <gjying1...@gmail.com>
> > wrote:
> > >> > >
> > >> > > > Hi Jing,
> > >> > > >     What do you think about it? Can we move forward this
> feature?
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Aitozi.
> > >> > > >
> > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月29日周一 09:56写道:
> > >> > > >
> > >> > > > > Hi Jing,
> > >> > > > >     > "Do you mean to support the AyncTableFunction beyond the
> > >> > > > > LookupTableSource?"
> > >> > > > > Yes, I mean to support the AyncTableFunction beyond the
> > >> > > > LookupTableSource.
> > >> > > > >
> > >> > > > > The "AsyncTableFunction" is the function with ability to be
> > >> executed
> > >> > > > async
> > >> > > > > (with AsyncWaitOperator).
> > >> > > > > The async lookup join is a one of usage of this. So, we don't
> > >> have to
> > >> > > > bind
> > >> > > > > the AyncTableFunction with LookupTableSource.
> > >> > > > > If User-defined AsyncTableFunction is supported, user can
> > directly
> > >> > use
> > >> > > > > lateral table syntax to perform async operation.
> > >> > > > >
> > >> > > > > > "It would be better if you could elaborate the proposed
> > changes
> > >> wrt
> > >> > > the
> > >> > > > > CorrelatedCodeGenerator with more details"
> > >> > > > >
> > >> > > > > In the proposal, we use lateral table syntax to support the
> > async
> > >> > table
> > >> > > > > function. So the planner will also treat this statement to a
> > >> > > > > CommonExecCorrelate node. So the runtime code should be
> > generated
> > >> in
> > >> > > > > CorrelatedCodeGenerator.
> > >> > > > > In CorrelatedCodeGenerator, we will know the TableFunction's
> > Kind
> > >> of
> > >> > > > > `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
> > >> > > > > For  `FunctionKind.ASYNC_TABLE` we can generate a
> > >> AsyncWaitOperator
> > >> > to
> > >> > > > > execute the async table function.
> > >> > > > >
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Aitozi.
> > >> > > > >
> > >> > > > >
> > >> > > > > Jing Ge <j...@ververica.com.invalid> 于2023年5月29日周一 03:22写道:
> > >> > > > >
> > >> > > > >> Hi Aitozi,
> > >> > > > >>
> > >> > > > >> Thanks for the clarification. The naming "Lookup" might
> suggest
> > >> > using
> > >> > > it
> > >> > > > >> for table look up. But conceptually what the eval() method
> will
> > >> do
> > >> > is
> > >> > > to
> > >> > > > >> get a collection of results(Row, RowData) from the given
> keys.
> > >> How
> > >> > it
> > >> > > > will
> > >> > > > >> be done depends on the implementation, i.e. you can implement
> > >> your
> > >> > own
> > >> > > > >> Source[1][2]. The example in the FLIP should be able to be
> > >> handled
> > >> > in
> > >> > > > this
> > >> > > > >> way.
> > >> > > > >>
> > >> > > > >> Do you mean to support the AyncTableFunction beyond the
> > >> > > > LookupTableSource?
> > >> > > > >> It would be better if you could elaborate the proposed
> changes
> > >> wrt
> > >> > the
> > >> > > > >> CorrelatedCodeGenerator with more details. Thanks!
> > >> > > > >>
> > >> > > > >> Best regards,
> > >> > > > >> Jing
> > >> > > > >>
> > >> > > > >> [1]
> > >> > > > >>
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
> > >> > > > >> [2]
> > >> > > > >>
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
> > >> > > > >>
> > >> > > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi <gjying1...@gmail.com
> >
> > >> > wrote:
> > >> > > > >>
> > >> > > > >> > Hi Jing,
> > >> > > > >> >     Thanks for your response. As stated in the FLIP, the
> > >> purpose
> > >> > of
> > >> > > > this
> > >> > > > >> > FLIP is meant to support
> > >> > > > >> > user-defined async table function. As described in flink
> > >> document
> > >> > > [1]
> > >> > > > >> >
> > >> > > > >> > Async table functions are special functions for table
> sources
> > >> that
> > >> > > > >> perform
> > >> > > > >> > > a lookup.
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >> > So end user can not directly define and use async table
> > >> function
> > >> > > now.
> > >> > > > An
> > >> > > > >> > user case is reported in [2]
> > >> > > > >> >
> > >> > > > >> > So, in conclusion, no new interface is introduced, but we
> > >> extend
> > >> > the
> > >> > > > >> > ability to support user-defined async table function.
> > >> > > > >> >
> > >> > > > >> > [1]:
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
> > >> > > > >> > [2]:
> > >> > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> > >> > > > >> >
> > >> > > > >> > Thanks.
> > >> > > > >> > Aitozi.
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年5月27日周六
> 06:40写道:
> > >> > > > >> >
> > >> > > > >> > > Hi Aitozi,
> > >> > > > >> > >
> > >> > > > >> > > Thanks for your proposal. I am not quite sure if I
> > understood
> > >> > your
> > >> > > > >> > thoughts
> > >> > > > >> > > correctly. You described a special case implementation of
> > the
> > >> > > > >> > > AsyncTableFunction with on public API changes. Would you
> > >> please
> > >> > > > >> elaborate
> > >> > > > >> > > your purpose of writing a FLIP according to the FLIP
> > >> > > > documentation[1]?
> > >> > > > >> > > Thanks!
> > >> > > > >> > >
> > >> > > > >> > > [1]
> > >> > > > >> > >
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > >> > > > >> > >
> > >> > > > >> > > Best regards,
> > >> > > > >> > > Jing
> > >> > > > >> > >
> > >> > > > >> > > On Wed, May 24, 2023 at 1:07 PM Aitozi <
> > gjying1...@gmail.com
> > >> >
> > >> > > > wrote:
> > >> > > > >> > >
> > >> > > > >> > > > May I ask for some feedback  :D
> > >> > > > >> > > >
> > >> > > > >> > > > Thanks,
> > >> > > > >> > > > Aitozi
> > >> > > > >> > > >
> > >> > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 19:14写道:
> > >> > > > >> > > > >
> > >> > > > >> > > > > Just catch an user case report from Giannis Polyzos
> for
> > >> this
> > >> > > > >> usage:
> > >> > > > >> > > > >
> > >> > > > >> > > > >
> > >> > > >
> https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> > >> > > > >> > > > >
> > >> > > > >> > > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 17:45写道:
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > Hi guys,
> > >> > > > >> > > > > >     I want to bring up a discussion about adding
> > >> support
> > >> > of
> > >> > > > User
> > >> > > > >> > > > > > Defined AsyncTableFunction in Flink.
> > >> > > > >> > > > > > Currently, async table function are special
> functions
> > >> for
> > >> > > > table
> > >> > > > >> > > source
> > >> > > > >> > > > > > to perform
> > >> > > > >> > > > > > async lookup. However, it's worth to support the
> user
> > >> > > defined
> > >> > > > >> async
> > >> > > > >> > > > > > table function.
> > >> > > > >> > > > > > Because, in this way, the end SQL user can leverage
> > it
> > >> to
> > >> > > > >> perform
> > >> > > > >> > the
> > >> > > > >> > > > > > async operation
> > >> > > > >> > > > > > which is useful to maximum the system throughput
> > >> > especially
> > >> > > > for
> > >> > > > >> IO
> > >> > > > >> > > > > > bottleneck case.
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > You can find some more detail in [1].
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > Looking forward to feedback
> > >> > > > >> > > > > >
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > [1]:
> > >> > > > >> > > >
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
> > >> > > > >> > > > > >
> > >> > > > >> > > > > > Thanks,
> > >> > > > >> > > > > > Aitozi.
> > >> > > > >> > > >
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to