Hi Aitozi,

Thanks for the feedback. Looking forward to the performance tests.

Afaik, lookup returns one row for each key [1] [2]. Conceptually, the
lookup function is used to enrich column(s) from the dimension table. If,
for the given key, there will be more than one row, there will be no way to
know which row will be used to enrich the key.

[1]
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
[2]
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196

Best regards,
Jing

On Fri, Jun 9, 2023 at 5:18 AM Aitozi <gjying1...@gmail.com> wrote:

> Hi Jing
>     Thanks for your good questions. I have updated the example to the FLIP.
>
> > Only one row for each lookup
> lookup can also return multi rows, based on the query result. [1]
>
> [1]:
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
>
> > If we use async calls with lateral join, my gut feeling is
> that we might have many more async calls than lookup join. I am not really
> sure if we will be facing potential issues in this case or not.
>
> IMO, the work pattern is similar to the lookup function, for each row from
> the left table,
> it will evaluate the eval method once, so the async call numbers will not
> change.
> and the maximum calls in flight is limited by the Async operators buffer
> capacity
> which will be controlled by the option.
>
> BTW, for the naming of these option, I updated the FLIP about this you can
> refer to
> the section of "ConfigOption" and "Rejected Alternatives"
>
> In the end, for the performance evaluation, I'd like to do some tests and
> will update it to the FLIP doc
>
> Thanks,
> Aitozi.
>
>
> Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五 07:23写道:
>
> > Hi Aitozi,
> >
> > Thanks for the clarification. The code example looks interesting. I would
> > suggest adding them into the FLIP. The description with code examples
> will
> > help readers understand the motivation and how to use it. Afaiac, it is a
> > valid feature for Flink users.
> >
> > As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME
> AS
> > OF which is also used in your code example. Temporal join performs the
> > lookup based on the processing time match. Only one row for each
> > lookup(afaiu, I need to check the source code to double confirm) will
> > return for further enrichment. One the other hand, lateral join will have
> > sub-queries correlated with every individual value of the reference table
> > from the preceding part of the query and each sub query will return
> > multiple rows. If we use async calls with lateral join, my gut feeling is
> > that we might have many more async calls than lookup join. I am not
> really
> > sure if we will be facing potential issues in this case or not. Possible
> > issues I can think of now e.g. too many PRC calls, too many async calls
> > processing, the sub query will return a table which might be (too) big,
> and
> > might cause performance issues. I would suggest preparing some use cases
> > and running some performance tests to check it. These are my concerns
> about
> > using async calls with lateral join and I'd like to share with you, happy
> > to discuss with you and hear different opinions, hopefully the
> > discussion could help me understand it more deeply. Please correct me if
> I
> > am wrong.
> >
> > Best regards,
> > Jing
> >
> >
> > On Thu, Jun 8, 2023 at 7:22 AM Aitozi <gjying1...@gmail.com> wrote:
> >
> > > Hi Mason,
> > >     Thanks for your input. I think if we support the user defined async
> > > table function,
> > > user will be able to use it to hold a batch data then handle it at one
> > time
> > > in the customized function.
> > >
> > > AsyncSink is meant for the sink operator. I have not figure out how to
> > > integrate in this case.
> > >
> > > Thanks,
> > > Atiozi.
> > >
> > >
> > > Mason Chen <mas.chen6...@gmail.com> 于2023年6月8日周四 12:40写道:
> > >
> > > > Hi Aitozi,
> > > >
> > > > I think it makes sense to make it easier for SQL users to make RPCs.
> Do
> > > you
> > > > think your proposal can extend to the ability to batch data for the
> > RPC?
> > > > This is also another common strategy to increase throughput. Also,
> have
> > > you
> > > > considered solving this a bit differently by leveraging Flink's
> > > AsyncSink?
> > > >
> > > > Best,
> > > > Mason
> > > >
> > > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi <gjying1...@gmail.com> wrote:
> > > >
> > > > > One more thing for discussion:
> > > > >
> > > > > In our internal implementation, we reuse the option
> > > > > `table.exec.async-lookup.buffer-capacity` and
> > > > > `table.exec.async-lookup.timeout` to config
> > > > > the async udtf. Do you think we should add two extra option to
> > > > distinguish
> > > > > from the lookup option such as
> > > > >
> > > > > `table.exec.async-udtf.buffer-capacity`
> > > > > `table.exec.async-udtf.timeout`
> > > > >
> > > > >
> > > > > Best,
> > > > > Aitozi.
> > > > >
> > > > >
> > > > >
> > > > > Aitozi <gjying1...@gmail.com> 于2023年6月5日周一 12:20写道:
> > > > >
> > > > > > Hi Jing,
> > > > > >
> > > > > >     > what is the difference between the RPC call or query you
> > > > mentioned
> > > > > > and the lookup in a very
> > > > > > general way
> > > > > >
> > > > > > I think the RPC call or query service is quite similar to the
> > lookup
> > > > > join.
> > > > > > But lookup join should work
> > > > > > with `LookupTableSource`.
> > > > > >
> > > > > > Let's see how we can perform an async RPC call with lookup join:
> > > > > >
> > > > > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > > > > (2) Implement a `LookupTableSource` connector run with the async
> > udtf
> > > > > > defined in (1).
> > > > > > (3) Then define a DDL of this look up table in SQL
> > > > > >
> > > > > > CREATE TEMPORARY TABLE Customers (
> > > > > >   id INT,
> > > > > >   name STRING,
> > > > > >   country STRING,
> > > > > >   zip STRING
> > > > > > ) WITH (
> > > > > >   'connector' = 'custom'
> > > > > > );
> > > > > >
> > > > > > (4) Run with the query as below:
> > > > > >
> > > > > > SELECT o.order_id, o.total, c.country, c.zip
> > > > > > FROM Orders AS o
> > > > > >   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > > > > >     ON o.customer_id = c.id;
> > > > > >
> > > > > > This example is from doc
> > > > > > <
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join
> > > > > >.You
> > > > > > can image the look up process as an async RPC call process.
> > > > > >
> > > > > > Let's see how we can perform an async RPC call with lateral join:
> > > > > >
> > > > > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > > > > (2) Run query with
> > > > > >
> > > > > > Create function f1 as '...' ;
> > > > > >
> > > > > > SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral
> > > table
> > > > > > (f1(order_id)) as T(...);
> > > > > >
> > > > > > As you can see, the lateral join version is more simple and
> > intuitive
> > > > to
> > > > > > users. Users do not have to wrap a
> > > > > > LookupTableSource for the purpose of using async udtf.
> > > > > >
> > > > > > In the end, We can also see the user defined async table function
> > is
> > > an
> > > > > > enhancement of the current lateral table join
> > > > > > which only supports sync lateral join now.
> > > > > >
> > > > > > Best,
> > > > > > Aitozi.
> > > > > >
> > > > > >
> > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月2日周五 19:37写道:
> > > > > >
> > > > > >> Hi Aitozi,
> > > > > >>
> > > > > >> Thanks for the update. Just out of curiosity, what is the
> > difference
> > > > > >> between the RPC call or query you mentioned and the lookup in a
> > very
> > > > > >> general way? Since Lateral join is used in the FLIP. Is there
> any
> > > > > special
> > > > > >> thought for that? Sorry for asking so many questions. The FLIP
> > > > contains
> > > > > >> limited information to understand the motivation.
> > > > > >>
> > > > > >> Best regards,
> > > > > >> Jing
> > > > > >>
> > > > > >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi <gjying1...@gmail.com>
> > wrote:
> > > > > >>
> > > > > >> > Hi Jing,
> > > > > >> >     I have updated the proposed changes to the FLIP. IMO,
> lookup
> > > has
> > > > > its
> > > > > >> > clear
> > > > > >> > async call requirement is due to its IO heavy operator. In our
> > > > usage,
> > > > > >> sql
> > > > > >> > users have
> > > > > >> > logic to do some RPC call or query the third-party service
> which
> > > is
> > > > > >> also IO
> > > > > >> > intensive.
> > > > > >> > In these case, we'd like to leverage the async function to
> > improve
> > > > the
> > > > > >> > throughput.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Aitozi.
> > > > > >> >
> > > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月1日周四 22:55写道:
> > > > > >> >
> > > > > >> > > Hi Aitozi,
> > > > > >> > >
> > > > > >> > > Sorry for the late reply. Would you like to update the
> > proposed
> > > > > >> changes
> > > > > >> > > with more details into the FLIP too?
> > > > > >> > > I got your point. It looks like a rational idea. However,
> > since
> > > > > lookup
> > > > > >> > has
> > > > > >> > > its clear async call requirement, are there any real use
> cases
> > > > that
> > > > > >> > > need this change? This will help us understand the
> motivation.
> > > > After
> > > > > >> all,
> > > > > >> > > lateral join and temporal lookup join[1] are quite
> different.
> > > > > >> > >
> > > > > >> > > Best regards,
> > > > > >> > > Jing
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > [1]
> > > > > >> > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
> > > > > >> > >
> > > > > >> > > On Wed, May 31, 2023 at 8:53 AM Aitozi <
> gjying1...@gmail.com>
> > > > > wrote:
> > > > > >> > >
> > > > > >> > > > Hi Jing,
> > > > > >> > > >     What do you think about it? Can we move forward this
> > > > feature?
> > > > > >> > > >
> > > > > >> > > > Thanks,
> > > > > >> > > > Aitozi.
> > > > > >> > > >
> > > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月29日周一 09:56写道:
> > > > > >> > > >
> > > > > >> > > > > Hi Jing,
> > > > > >> > > > >     > "Do you mean to support the AyncTableFunction
> beyond
> > > the
> > > > > >> > > > > LookupTableSource?"
> > > > > >> > > > > Yes, I mean to support the AyncTableFunction beyond the
> > > > > >> > > > LookupTableSource.
> > > > > >> > > > >
> > > > > >> > > > > The "AsyncTableFunction" is the function with ability to
> > be
> > > > > >> executed
> > > > > >> > > > async
> > > > > >> > > > > (with AsyncWaitOperator).
> > > > > >> > > > > The async lookup join is a one of usage of this. So, we
> > > don't
> > > > > >> have to
> > > > > >> > > > bind
> > > > > >> > > > > the AyncTableFunction with LookupTableSource.
> > > > > >> > > > > If User-defined AsyncTableFunction is supported, user
> can
> > > > > directly
> > > > > >> > use
> > > > > >> > > > > lateral table syntax to perform async operation.
> > > > > >> > > > >
> > > > > >> > > > > > "It would be better if you could elaborate the
> proposed
> > > > > changes
> > > > > >> wrt
> > > > > >> > > the
> > > > > >> > > > > CorrelatedCodeGenerator with more details"
> > > > > >> > > > >
> > > > > >> > > > > In the proposal, we use lateral table syntax to support
> > the
> > > > > async
> > > > > >> > table
> > > > > >> > > > > function. So the planner will also treat this statement
> > to a
> > > > > >> > > > > CommonExecCorrelate node. So the runtime code should be
> > > > > generated
> > > > > >> in
> > > > > >> > > > > CorrelatedCodeGenerator.
> > > > > >> > > > > In CorrelatedCodeGenerator, we will know the
> > TableFunction's
> > > > > Kind
> > > > > >> of
> > > > > >> > > > > `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
> > > > > >> > > > > For  `FunctionKind.ASYNC_TABLE` we can generate a
> > > > > >> AsyncWaitOperator
> > > > > >> > to
> > > > > >> > > > > execute the async table function.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > Thanks,
> > > > > >> > > > > Aitozi.
> > > > > >> > > > >
> > > > > >> > > > >
> > > > > >> > > > > Jing Ge <j...@ververica.com.invalid> 于2023年5月29日周一
> > 03:22写道:
> > > > > >> > > > >
> > > > > >> > > > >> Hi Aitozi,
> > > > > >> > > > >>
> > > > > >> > > > >> Thanks for the clarification. The naming "Lookup" might
> > > > suggest
> > > > > >> > using
> > > > > >> > > it
> > > > > >> > > > >> for table look up. But conceptually what the eval()
> > method
> > > > will
> > > > > >> do
> > > > > >> > is
> > > > > >> > > to
> > > > > >> > > > >> get a collection of results(Row, RowData) from the
> given
> > > > keys.
> > > > > >> How
> > > > > >> > it
> > > > > >> > > > will
> > > > > >> > > > >> be done depends on the implementation, i.e. you can
> > > implement
> > > > > >> your
> > > > > >> > own
> > > > > >> > > > >> Source[1][2]. The example in the FLIP should be able to
> > be
> > > > > >> handled
> > > > > >> > in
> > > > > >> > > > this
> > > > > >> > > > >> way.
> > > > > >> > > > >>
> > > > > >> > > > >> Do you mean to support the AyncTableFunction beyond the
> > > > > >> > > > LookupTableSource?
> > > > > >> > > > >> It would be better if you could elaborate the proposed
> > > > changes
> > > > > >> wrt
> > > > > >> > the
> > > > > >> > > > >> CorrelatedCodeGenerator with more details. Thanks!
> > > > > >> > > > >>
> > > > > >> > > > >> Best regards,
> > > > > >> > > > >> Jing
> > > > > >> > > > >>
> > > > > >> > > > >> [1]
> > > > > >> > > > >>
> > > > > >> > > > >>
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
> > > > > >> > > > >> [2]
> > > > > >> > > > >>
> > > > > >> > > > >>
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
> > > > > >> > > > >>
> > > > > >> > > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi <
> > > gjying1...@gmail.com
> > > > >
> > > > > >> > wrote:
> > > > > >> > > > >>
> > > > > >> > > > >> > Hi Jing,
> > > > > >> > > > >> >     Thanks for your response. As stated in the FLIP,
> > the
> > > > > >> purpose
> > > > > >> > of
> > > > > >> > > > this
> > > > > >> > > > >> > FLIP is meant to support
> > > > > >> > > > >> > user-defined async table function. As described in
> > flink
> > > > > >> document
> > > > > >> > > [1]
> > > > > >> > > > >> >
> > > > > >> > > > >> > Async table functions are special functions for table
> > > > sources
> > > > > >> that
> > > > > >> > > > >> perform
> > > > > >> > > > >> > > a lookup.
> > > > > >> > > > >> > >
> > > > > >> > > > >> >
> > > > > >> > > > >> > So end user can not directly define and use async
> table
> > > > > >> function
> > > > > >> > > now.
> > > > > >> > > > An
> > > > > >> > > > >> > user case is reported in [2]
> > > > > >> > > > >> >
> > > > > >> > > > >> > So, in conclusion, no new interface is introduced,
> but
> > we
> > > > > >> extend
> > > > > >> > the
> > > > > >> > > > >> > ability to support user-defined async table function.
> > > > > >> > > > >> >
> > > > > >> > > > >> > [1]:
> > > > > >> > > > >> >
> > > > > >> > > > >> >
> > > > > >> > > > >>
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
> > > > > >> > > > >> > [2]:
> > > > > >> > >
> > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> > > > > >> > > > >> >
> > > > > >> > > > >> > Thanks.
> > > > > >> > > > >> > Aitozi.
> > > > > >> > > > >> >
> > > > > >> > > > >> >
> > > > > >> > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年5月27日周六
> > > > 06:40写道:
> > > > > >> > > > >> >
> > > > > >> > > > >> > > Hi Aitozi,
> > > > > >> > > > >> > >
> > > > > >> > > > >> > > Thanks for your proposal. I am not quite sure if I
> > > > > understood
> > > > > >> > your
> > > > > >> > > > >> > thoughts
> > > > > >> > > > >> > > correctly. You described a special case
> > implementation
> > > of
> > > > > the
> > > > > >> > > > >> > > AsyncTableFunction with on public API changes.
> Would
> > > you
> > > > > >> please
> > > > > >> > > > >> elaborate
> > > > > >> > > > >> > > your purpose of writing a FLIP according to the
> FLIP
> > > > > >> > > > documentation[1]?
> > > > > >> > > > >> > > Thanks!
> > > > > >> > > > >> > >
> > > > > >> > > > >> > > [1]
> > > > > >> > > > >> > >
> > > > > >> > > > >> > >
> > > > > >> > > > >> >
> > > > > >> > > > >>
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > > > > >> > > > >> > >
> > > > > >> > > > >> > > Best regards,
> > > > > >> > > > >> > > Jing
> > > > > >> > > > >> > >
> > > > > >> > > > >> > > On Wed, May 24, 2023 at 1:07 PM Aitozi <
> > > > > gjying1...@gmail.com
> > > > > >> >
> > > > > >> > > > wrote:
> > > > > >> > > > >> > >
> > > > > >> > > > >> > > > May I ask for some feedback  :D
> > > > > >> > > > >> > > >
> > > > > >> > > > >> > > > Thanks,
> > > > > >> > > > >> > > > Aitozi
> > > > > >> > > > >> > > >
> > > > > >> > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二
> > 19:14写道:
> > > > > >> > > > >> > > > >
> > > > > >> > > > >> > > > > Just catch an user case report from Giannis
> > Polyzos
> > > > for
> > > > > >> this
> > > > > >> > > > >> usage:
> > > > > >> > > > >> > > > >
> > > > > >> > > > >> > > > >
> > > > > >> > > >
> > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> > > > > >> > > > >> > > > >
> > > > > >> > > > >> > > > > Aitozi <gjying1...@gmail.com> 于2023年5月23日周二
> > > 17:45写道:
> > > > > >> > > > >> > > > > >
> > > > > >> > > > >> > > > > > Hi guys,
> > > > > >> > > > >> > > > > >     I want to bring up a discussion about
> > adding
> > > > > >> support
> > > > > >> > of
> > > > > >> > > > User
> > > > > >> > > > >> > > > > > Defined AsyncTableFunction in Flink.
> > > > > >> > > > >> > > > > > Currently, async table function are special
> > > > functions
> > > > > >> for
> > > > > >> > > > table
> > > > > >> > > > >> > > source
> > > > > >> > > > >> > > > > > to perform
> > > > > >> > > > >> > > > > > async lookup. However, it's worth to support
> > the
> > > > user
> > > > > >> > > defined
> > > > > >> > > > >> async
> > > > > >> > > > >> > > > > > table function.
> > > > > >> > > > >> > > > > > Because, in this way, the end SQL user can
> > > leverage
> > > > > it
> > > > > >> to
> > > > > >> > > > >> perform
> > > > > >> > > > >> > the
> > > > > >> > > > >> > > > > > async operation
> > > > > >> > > > >> > > > > > which is useful to maximum the system
> > throughput
> > > > > >> > especially
> > > > > >> > > > for
> > > > > >> > > > >> IO
> > > > > >> > > > >> > > > > > bottleneck case.
> > > > > >> > > > >> > > > > >
> > > > > >> > > > >> > > > > > You can find some more detail in [1].
> > > > > >> > > > >> > > > > >
> > > > > >> > > > >> > > > > > Looking forward to feedback
> > > > > >> > > > >> > > > > >
> > > > > >> > > > >> > > > > >
> > > > > >> > > > >> > > > > > [1]:
> > > > > >> > > > >> > > >
> > > > > >> > > > >> > >
> > > > > >> > > > >> >
> > > > > >> > > > >>
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
> > > > > >> > > > >> > > > > >
> > > > > >> > > > >> > > > > > Thanks,
> > > > > >> > > > >> > > > > > Aitozi.
> > > > > >> > > > >> > > >
> > > > > >> > > > >> > >
> > > > > >> > > > >> >
> > > > > >> > > > >>
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to