Hi Aitozi,

I think it makes sense to make it easier for SQL users to make RPCs. Do you
think your proposal can extend to the ability to batch data for the RPC?
This is also another common strategy to increase throughput. Also, have you
considered solving this a bit differently by leveraging Flink's AsyncSink?

Best,
Mason

On Mon, Jun 5, 2023 at 1:50 AM Aitozi <gjying1...@gmail.com> wrote:

> One more thing for discussion:
>
> In our internal implementation, we reuse the option
> `table.exec.async-lookup.buffer-capacity` and
> `table.exec.async-lookup.timeout` to config
> the async udtf. Do you think we should add two extra option to distinguish
> from the lookup option such as
>
> `table.exec.async-udtf.buffer-capacity`
> `table.exec.async-udtf.timeout`
>
>
> Best,
> Aitozi.
>
>
>
> Aitozi <gjying1...@gmail.com> 于2023年6月5日周一 12:20写道:
>
> > Hi Jing,
> >
> >     > what is the difference between the RPC call or query you mentioned
> > and the lookup in a very
> > general way
> >
> > I think the RPC call or query service is quite similar to the lookup
> join.
> > But lookup join should work
> > with `LookupTableSource`.
> >
> > Let's see how we can perform an async RPC call with lookup join:
> >
> > (1) Implement an AsyncTableFunction with RPC call logic.
> > (2) Implement a `LookupTableSource` connector run with the async udtf
> > defined in (1).
> > (3) Then define a DDL of this look up table in SQL
> >
> > CREATE TEMPORARY TABLE Customers (
> >   id INT,
> >   name STRING,
> >   country STRING,
> >   zip STRING
> > ) WITH (
> >   'connector' = 'custom'
> > );
> >
> > (4) Run with the query as below:
> >
> > SELECT o.order_id, o.total, c.country, c.zip
> > FROM Orders AS o
> >   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> >     ON o.customer_id = c.id;
> >
> > This example is from doc
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join
> >.You
> > can image the look up process as an async RPC call process.
> >
> > Let's see how we can perform an async RPC call with lateral join:
> >
> > (1) Implement an AsyncTableFunction with RPC call logic.
> > (2) Run query with
> >
> > Create function f1 as '...' ;
> >
> > SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral table
> > (f1(order_id)) as T(...);
> >
> > As you can see, the lateral join version is more simple and intuitive to
> > users. Users do not have to wrap a
> > LookupTableSource for the purpose of using async udtf.
> >
> > In the end, We can also see the user defined async table function is an
> > enhancement of the current lateral table join
> > which only supports sync lateral join now.
> >
> > Best,
> > Aitozi.
> >
> >
> > Jing Ge <j...@ververica.com.invalid> 于2023年6月2日周五 19:37写道:
> >
> >> Hi Aitozi,
> >>
> >> Thanks for the update. Just out of curiosity, what is the difference
> >> between the RPC call or query you mentioned and the lookup in a very
> >> general way? Since Lateral join is used in the FLIP. Is there any
> special
> >> thought for that? Sorry for asking so many questions. The FLIP contains
> >> limited information to understand the motivation.
> >>
> >> Best regards,
> >> Jing
> >>
> >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi <gjying1...@gmail.com> wrote:
> >>
> >> > Hi Jing,
> >> >     I have updated the proposed changes to the FLIP. IMO, lookup has
> its
> >> > clear
> >> > async call requirement is due to its IO heavy operator. In our usage,
> >> sql
> >> > users have
> >> > logic to do some RPC call or query the third-party service which is
> >> also IO
> >> > intensive.
> >> > In these case, we'd like to leverage the async function to improve the
> >> > throughput.
> >> >
> >> > Thanks,
> >> > Aitozi.
> >> >
> >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月1日周四 22:55写道:
> >> >
> >> > > Hi Aitozi,
> >> > >
> >> > > Sorry for the late reply. Would you like to update the proposed
> >> changes
> >> > > with more details into the FLIP too?
> >> > > I got your point. It looks like a rational idea. However, since
> lookup
> >> > has
> >> > > its clear async call requirement, are there any real use cases that
> >> > > need this change? This will help us understand the motivation. After
> >> all,
> >> > > lateral join and temporal lookup join[1] are quite different.
> >> > >
> >> > > Best regards,
> >> > > Jing
> >> > >
> >> > >
> >> > > [1]
> >> > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
> >> > >
> >> > > On Wed, May 31, 2023 at 8:53 AM Aitozi <gjying1...@gmail.com>
> wrote:
> >> > >
> >> > > > Hi Jing,
> >> > > >     What do you think about it? Can we move forward this feature?
> >> > > >
> >> > > > Thanks,
> >> > > > Aitozi.
> >> > > >
> >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月29日周一 09:56写道:
> >> > > >
> >> > > > > Hi Jing,
> >> > > > >     > "Do you mean to support the AyncTableFunction beyond the
> >> > > > > LookupTableSource?"
> >> > > > > Yes, I mean to support the AyncTableFunction beyond the
> >> > > > LookupTableSource.
> >> > > > >
> >> > > > > The "AsyncTableFunction" is the function with ability to be
> >> executed
> >> > > > async
> >> > > > > (with AsyncWaitOperator).
> >> > > > > The async lookup join is a one of usage of this. So, we don't
> >> have to
> >> > > > bind
> >> > > > > the AyncTableFunction with LookupTableSource.
> >> > > > > If User-defined AsyncTableFunction is supported, user can
> directly
> >> > use
> >> > > > > lateral table syntax to perform async operation.
> >> > > > >
> >> > > > > > "It would be better if you could elaborate the proposed
> changes
> >> wrt
> >> > > the
> >> > > > > CorrelatedCodeGenerator with more details"
> >> > > > >
> >> > > > > In the proposal, we use lateral table syntax to support the
> async
> >> > table
> >> > > > > function. So the planner will also treat this statement to a
> >> > > > > CommonExecCorrelate node. So the runtime code should be
> generated
> >> in
> >> > > > > CorrelatedCodeGenerator.
> >> > > > > In CorrelatedCodeGenerator, we will know the TableFunction's
> Kind
> >> of
> >> > > > > `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
> >> > > > > For  `FunctionKind.ASYNC_TABLE` we can generate a
> >> AsyncWaitOperator
> >> > to
> >> > > > > execute the async table function.
> >> > > > >
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Aitozi.
> >> > > > >
> >> > > > >
> >> > > > > Jing Ge <j...@ververica.com.invalid> 于2023年5月29日周一 03:22写道:
> >> > > > >
> >> > > > >> Hi Aitozi,
> >> > > > >>
> >> > > > >> Thanks for the clarification. The naming "Lookup" might suggest
> >> > using
> >> > > it
> >> > > > >> for table look up. But conceptually what the eval() method will
> >> do
> >> > is
> >> > > to
> >> > > > >> get a collection of results(Row, RowData) from the given keys.
> >> How
> >> > it
> >> > > > will
> >> > > > >> be done depends on the implementation, i.e. you can implement
> >> your
> >> > own
> >> > > > >> Source[1][2]. The example in the FLIP should be able to be
> >> handled
> >> > in
> >> > > > this
> >> > > > >> way.
> >> > > > >>
> >> > > > >> Do you mean to support the AyncTableFunction beyond the
> >> > > > LookupTableSource?
> >> > > > >> It would be better if you could elaborate the proposed changes
> >> wrt
> >> > the
> >> > > > >> CorrelatedCodeGenerator with more details. Thanks!
> >> > > > >>
> >> > > > >> Best regards,
> >> > > > >> Jing
> >> > > > >>
> >> > > > >> [1]
> >> > > > >>
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
> >> > > > >> [2]
> >> > > > >>
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
> >> > > > >>
> >> > > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi <gjying1...@gmail.com>
> >> > wrote:
> >> > > > >>
> >> > > > >> > Hi Jing,
> >> > > > >> >     Thanks for your response. As stated in the FLIP, the
> >> purpose
> >> > of
> >> > > > this
> >> > > > >> > FLIP is meant to support
> >> > > > >> > user-defined async table function. As described in flink
> >> document
> >> > > [1]
> >> > > > >> >
> >> > > > >> > Async table functions are special functions for table sources
> >> that
> >> > > > >> perform
> >> > > > >> > > a lookup.
> >> > > > >> > >
> >> > > > >> >
> >> > > > >> > So end user can not directly define and use async table
> >> function
> >> > > now.
> >> > > > An
> >> > > > >> > user case is reported in [2]
> >> > > > >> >
> >> > > > >> > So, in conclusion, no new interface is introduced, but we
> >> extend
> >> > the
> >> > > > >> > ability to support user-defined async table function.
> >> > > > >> >
> >> > > > >> > [1]:
> >> > > > >> >
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
> >> > > > >> > [2]:
> >> > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> >> > > > >> >
> >> > > > >> > Thanks.
> >> > > > >> > Aitozi.
> >> > > > >> >
> >> > > > >> >
> >> > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年5月27日周六 06:40写道:
> >> > > > >> >
> >> > > > >> > > Hi Aitozi,
> >> > > > >> > >
> >> > > > >> > > Thanks for your proposal. I am not quite sure if I
> understood
> >> > your
> >> > > > >> > thoughts
> >> > > > >> > > correctly. You described a special case implementation of
> the
> >> > > > >> > > AsyncTableFunction with on public API changes. Would you
> >> please
> >> > > > >> elaborate
> >> > > > >> > > your purpose of writing a FLIP according to the FLIP
> >> > > > documentation[1]?
> >> > > > >> > > Thanks!
> >> > > > >> > >
> >> > > > >> > > [1]
> >> > > > >> > >
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >> > > > >> > >
> >> > > > >> > > Best regards,
> >> > > > >> > > Jing
> >> > > > >> > >
> >> > > > >> > > On Wed, May 24, 2023 at 1:07 PM Aitozi <
> gjying1...@gmail.com
> >> >
> >> > > > wrote:
> >> > > > >> > >
> >> > > > >> > > > May I ask for some feedback  :D
> >> > > > >> > > >
> >> > > > >> > > > Thanks,
> >> > > > >> > > > Aitozi
> >> > > > >> > > >
> >> > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 19:14写道:
> >> > > > >> > > > >
> >> > > > >> > > > > Just catch an user case report from Giannis Polyzos for
> >> this
> >> > > > >> usage:
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> >> > > > >> > > > >
> >> > > > >> > > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二 17:45写道:
> >> > > > >> > > > > >
> >> > > > >> > > > > > Hi guys,
> >> > > > >> > > > > >     I want to bring up a discussion about adding
> >> support
> >> > of
> >> > > > User
> >> > > > >> > > > > > Defined AsyncTableFunction in Flink.
> >> > > > >> > > > > > Currently, async table function are special functions
> >> for
> >> > > > table
> >> > > > >> > > source
> >> > > > >> > > > > > to perform
> >> > > > >> > > > > > async lookup. However, it's worth to support the user
> >> > > defined
> >> > > > >> async
> >> > > > >> > > > > > table function.
> >> > > > >> > > > > > Because, in this way, the end SQL user can leverage
> it
> >> to
> >> > > > >> perform
> >> > > > >> > the
> >> > > > >> > > > > > async operation
> >> > > > >> > > > > > which is useful to maximum the system throughput
> >> > especially
> >> > > > for
> >> > > > >> IO
> >> > > > >> > > > > > bottleneck case.
> >> > > > >> > > > > >
> >> > > > >> > > > > > You can find some more detail in [1].
> >> > > > >> > > > > >
> >> > > > >> > > > > > Looking forward to feedback
> >> > > > >> > > > > >
> >> > > > >> > > > > >
> >> > > > >> > > > > > [1]:
> >> > > > >> > > >
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
> >> > > > >> > > > > >
> >> > > > >> > > > > > Thanks,
> >> > > > >> > > > > > Aitozi.
> >> > > > >> > > >
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to