Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-14 Thread Aitozi
Hi Alan,
Nice FLIP, I also explore leveraging the async table function[1] to
improve the throughput before.

About the configs, what do you think using hints as mentioned in [1].

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.

Timo Walther  于2023年12月14日周四 17:29写道:

> Hi Alan,
>
> thanks for proposing this FLIP. It's a great addition to Flink and has
> been requested multiple times. It will be in particular interesting for
> accessing REST endpoints and other remote services.
>
> Great that we can generalize and reuse parts of the Python planner rules
> and code for this.
>
> I have some feedback regarding the API:
>
> 1) Configuration
>
> Configuration keys like
>
> `table.exec.async-scalar.catalog.db.func-name.buffer-capacity`
>
> are currently not supported in the configuration stack. The key space
> should remain constant. Only a constant key space enables the use of the
> ConfigOption class which is required in the layered configuration. For
> now I would suggest to only allow a global setting for buffer capacity,
> timeout, and retry-strategy. We can later work on a per-function
> configuration (potentially also needed for other use cases).
>
> 2) Semantical declaration
>
> Regarding
>
> `table.exec.async-scalar.catalog.db.func-name.output-mode`
>
> this is a semantical property of a function and should be defined
> per-function. It impacts the query result and potentially the behavior
> of planner rules.
>
> I see two options for this either: (a) an additional method in
> AsyncScalarFunction or (b) adding this to the function's requirements. I
> vote for (b), because a FunctionDefinition should be fully self
> contained and sufficient for planning.
>
> Thus, for `FunctionDefinition.getRequirements():
> Set` we can add a new requirement `ORDERED` which
> should also be the default for AsyncScalarFunction. `getRequirements()`
> can be overwritten and return a set without this requirement if the user
> intents to do this.
>
>
> Thanks,
> Timo
>
>
>
>
> On 11.12.23 18:43, Piotr Nowojski wrote:
> > +1 to the idea, I don't have any comments.
> >
> > Best,
> > Piotrek
> >
> > czw., 7 gru 2023 o 07:15 Alan Sheinberg  .invalid>
> > napisał(a):
> >
> >>>
> >>> Nicely written and makes sense.  The only feedback I have is around the
> >>> naming of the generalization, e.g. "Specifically,
> PythonCalcSplitRuleBase
> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
> to
> >>> imply/suggest that all Async functions are remote.  I wonder if we can
> >> find
> >>> another name which doesn't carry that connotation; maybe
> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
> Python
> >>> and Async functions seems reasonable.)
> >>>
> >> Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
> >> believe that the python calls are also done asynchronously, so that
> might
> >> be a reasonable name, so long as there's no confusion between the base
> and
> >> async child class.
> >>
> >> On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes  >
> >> wrote:
> >>
> >>> Hi Alan,
> >>>
> >>> Nicely written and makes sense.  The only feedback I have is around the
> >>> naming of the generalization, e.g. "Specifically,
> PythonCalcSplitRuleBase
> >>> will be generalized into RemoteCalcSplitRuleBase."  This naming seems
> to
> >>> imply/suggest that all Async functions are remote.  I wonder if we can
> >> find
> >>> another name which doesn't carry that connotation; maybe
> >>> AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles
> Python
> >>> and Async functions seems reasonable.)
> >>>
> >>> Cheers,
> >>>
> >>> Jim
> >>>
> >>> On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
> >>>  wrote:
> >>>
> >>>> I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
> >>>> asynchronous scalar function support [1]
> >>>>
> >>>> This feature proposes adding a new UDF type AsyncScalarFunction which
> >> is
> >>>> invoked just like a normal ScalarFunction, but is implemented with an
> >>>> asynchronous eval method.  I had brought this up including the
> >> motivation
> 

Re: SQL return type change from 1.17 to 1.18

2023-12-07 Thread Aitozi
Hi Peter, Timo
Sorry for this breaking change, I didn't notice that was a breaking
change.
I'm +1 to revert the FLINK-33523

Regards,
aitozi

Timo Walther  于2023年12月7日周四 20:41写道:

> Hi Peter,
>
> thanks for reaching out to the Flink community. This is indeed a serious
> issue. As the author of the Flink type system, DataType and many related
> utilities I strongly vote for reverting FLINK-33523:
>
> - It changes the Flink type system without a FLIP.
> - It breaks backwards compatibility with UDFs and connectors.
>
> Regards,
> Timo
>
> On 07.12.23 07:38, Péter Váry wrote:
> > Hi Team,
> >
> > We are working on upgrading the Iceberg-Flink connector from 1.17 to
> 1.18,
> > and found that some of our tests are failing. Prabhu Joseph created a
> jira
> > [1] to discuss this issue, along with short example code.
> >
> > In a nutshell:
> > - Create a table with an 'ARRAY' column
> > - Run a select which returns this column
> > - The return type changes:
> >  - From 'Object[]' - in 1.17
> >  - To 'int[]' - in 1.18
> >
> > The change is introduced by this jira [2].
> >
> > While I understand the reasoning behind this change, this will break some
> > users existing workflow as evidenced by Xingcan Cui finding this
> > independently [3].
> >
> > What is the opinion of the community about this change?
> > - Do we want to revert the change?
> > - Do we ask the owners of the change to make this behavior configurable?
> > - Do we accept this behavior change in a minor release?
> >
> > Thanks,
> > Peter
> >
> > [1] - https://issues.apache.org/jira/browse/FLINK-33523 - DataType
> > ARRAY fails to cast into Object[]
> > [2] - https://issues.apache.org/jira/browse/FLINK-31835 - DataTypeHint
> > don't support Row>
> > [3] - https://issues.apache.org/jira/browse/FLINK-33547 - SQL primitive
> > array type after upgrading to Flink 1.18.0
> >
>
>


[jira] [Created] (FLINK-32711) Type mismatch when proctime function used as parameter

2023-07-28 Thread Aitozi (Jira)
Aitozi created FLINK-32711:
--

 Summary: Type mismatch when proctime function used as parameter
 Key: FLINK-32711
 URL: https://issues.apache.org/jira/browse/FLINK-32711
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: Aitozi


reproduce case:

{code:sql}
SELECT TYPEOF(PROCTIME())
{code}

this query will fail with 

org.apache.flink.table.planner.codegen.CodeGenException: Mismatch of function's 
argument data type 'TIMESTAMP_LTZ(3) NOT NULL' and actual argument type 
'TIMESTAMP_LTZ(3)'.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32659) DB connection may leak if exception is thrown in JdbcOutputFormat#close

2023-07-24 Thread Aitozi (Jira)
Aitozi created FLINK-32659:
--

 Summary: DB connection may leak if exception is thrown in 
JdbcOutputFormat#close
 Key: FLINK-32659
 URL: https://issues.apache.org/jira/browse/FLINK-32659
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-07-06 Thread Aitozi
Hi guys,
Since there are no further comments, Kindly ping for the vote thread
[1] :D

Thanks,
Aitozi.

[1]: https://lists.apache.org/thread/7g5n2vshosom2dj9bp7x4n01okrnx4xx

Aitozi  于2023年6月26日周一 10:31写道:

> Hi Lincoln,
> Thanks for your confirmation. I have updated the consensus to the FLIP
> doc.
> If there are no other comments, I'd like to restart the vote process in
> [1] today.
>
> https://lists.apache.org/thread/7g5n2vshosom2dj9bp7x4n01okrnx4xx
>
> Thanks,
> Aitozi.
>
> Lincoln Lee  于2023年6月21日周三 22:29写道:
>
>> Hi Aitozi,
>>
>> Thanks for your updates!
>>
>> By the design of hints, the hints after select clause belong to the query
>> hints category, and this new hint is also a kind of join hints[1].
>> Join table function is one of the join type defined by flink sql joins[2],
>> all existing join hints[1] omit the 'join' keyword,
>> so I would prefer the 'ASYNC_TABLE_FUNC' (which is actually the one for
>> 'ASYNC_TABLE_FUNC_JOIN').
>>
>> Since a short Chinese holiday is coming, I suggest waiting for other
>> people's responses before continuing to vote (next monday?)
>>
>> Btw, I discussed with @fudian offline about pyflink support, there should
>> be no known issues, so you can create a subtask with pyflink support after
>> the vote passed.
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#join-hints
>> [2]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/
>>
>> Best,
>> Lincoln Lee
>>
>>
>> Aitozi  于2023年6月18日周日 21:18写道:
>>
>> > Hi all,
>> > Sorry for the late reply, I have a discussion with Lincoln offline,
>> > mainly about
>> > the naming of the hints option. Thanks Lincoln for the valuable
>> > suggestions.
>> >
>> > Let me answer the last email inline.
>> >
>> > >For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC
>> call as
>> > an example?
>> >
>> > Sure, will give an example when adding the doc of async udtf and will
>> > update the FLIP simultaneously
>> >
>> > >For the name of this query hint, 'LATERAL' (include its internal
>> options)
>> > don't show any relevance to async, but I haven't thought of a suitable
>> name
>> > at the moment,
>> >
>> > After some discussion with Lincoln, We prefer to choose one of the
>> > `ASYNC_TABLE_FUNC` and `ASYNC_LATERAL`.
>> > Besides, In my opinion the keyword `lateral`'s use scenario is wider
>> than
>> > the table function join, but in this case we only want to config
>> > the async table function, So I'm a bit more lean to the
>> `ASYNC_TABLE_FUNC`.
>> > Looking forward to some inputs if you guys have
>> > some better suggestion on the naming.
>> >
>> > For the usage of the hints config option, I have updated the section
>> > of ConfigOption, you can refer to the FLIP
>> > for more details.
>> >
>> > >Also, the terms 'correlate join' and 'lateral join' are not the same
>> as in
>> > the current joins page[1], so maybe it would be better if we unified
>> them
>> > into  'join table function'
>> >
>> > Yes, we should unified to the 'join table function', updated.
>> >
>> > Best,
>> > Aitozi
>> >
>> > Lincoln Lee  于2023年6月15日周四 09:15写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > Thanks for your reply!  Gives sql users more flexibility to get
>> > > asynchronous processing capabilities via lateral join table function
>> +1
>> > for
>> > > this
>> > >
>> > > For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC
>> call
>> > as
>> > > an example?
>> > >
>> > > For the name of this query hint, 'LATERAL' (include its internal
>> options)
>> > > don't show any relevance to async, but I haven't thought of a suitable
>> > name
>> > > at the moment,
>> > > maybe we need to highlight the async keyword directly, we can also
>> see if
>> > > others have better candidates
>> > >
>> > > For the hint option "timeout = '180s'" should be "'timeout' = '180s'",
>> > > seems 

Re: Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-07-06 Thread Aitozi
Hi Awake,
Thanks for your good point, updated

Best,
Aitozi.

宇航 李  于2023年7月5日周三 11:29写道:

> Hi Aitozi,
>
> I think it is necessary to add the following description in FLIP to
> express the difference between user-defined asynchronous table function and
> AsyncTableFunction:
>
> User-defined asynchronous table functions allow complex parameters (e.g.,
> Row type) to be passed to function, which is important in RPC, rather than
> using ‘join … on ...'.
>
> Thanks,
> Awake.
>
>
> On 2023/06/26 02:31:59 Aitozi wrote:
> > Hi Lincoln,
> > Thanks for your confirmation. I have updated the consensus to the
> FLIP
> > doc.
> > If there are no other comments, I'd like to restart the vote process in
> [1]
> > today.
> >
> > https://lists.apache.org/thread/7g5n2vshosom2dj9bp7x4n01okrnx4xx
> >
> > Thanks,
> > Aitozi.
> >
> > Lincoln Lee  于2023年6月21日周三 22:29写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for your updates!
> > >
> > > By the design of hints, the hints after select clause belong to the
> query
> > > hints category, and this new hint is also a kind of join hints[1].
> > > Join table function is one of the join type defined by flink sql
> joins[2],
> > > all existing join hints[1] omit the 'join' keyword,
> > > so I would prefer the 'ASYNC_TABLE_FUNC' (which is actually the one for
> > > 'ASYNC_TABLE_FUNC_JOIN').
> > >
> > > Since a short Chinese holiday is coming, I suggest waiting for other
> > > people's responses before continuing to vote (next monday?)
> > >
> > > Btw, I discussed with @fudian offline about pyflink support, there
> should
> > > be no known issues, so you can create a subtask with pyflink support
> after
> > > the vote passed.
> > >
> > > [1]
> > >
> > >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#join-hints
> > > [2]
> > >
> > >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Aitozi  于2023年6月18日周日 21:18写道:
> > >
> > > > Hi all,
> > > > Sorry for the late reply, I have a discussion with Lincoln
> offline,
> > > > mainly about
> > > > the naming of the hints option. Thanks Lincoln for the valuable
> > > > suggestions.
> > > >
> > > > Let me answer the last email inline.
> > > >
> > > > >For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC
> call
> > > as
> > > > an example?
> > > >
> > > > Sure, will give an example when adding the doc of async udtf and will
> > > > update the FLIP simultaneously
> > > >
> > > > >For the name of this query hint, 'LATERAL' (include its internal
> > > options)
> > > > don't show any relevance to async, but I haven't thought of a
> suitable
> > > name
> > > > at the moment,
> > > >
> > > > After some discussion with Lincoln, We prefer to choose one of the
> > > > `ASYNC_TABLE_FUNC` and `ASYNC_LATERAL`.
> > > > Besides, In my opinion the keyword `lateral`'s use scenario is wider
> than
> > > > the table function join, but in this case we only want to config
> > > > the async table function, So I'm a bit more lean to the
> > > `ASYNC_TABLE_FUNC`.
> > > > Looking forward to some inputs if you guys have
> > > > some better suggestion on the naming.
> > > >
> > > > For the usage of the hints config option, I have updated the section
> > > > of ConfigOption, you can refer to the FLIP
> > > > for more details.
> > > >
> > > > >Also, the terms 'correlate join' and 'lateral join' are not the
> same as
> > > in
> > > > the current joins page[1], so maybe it would be better if we unified
> them
> > > > into  'join table function'
> > > >
> > > > Yes, we should unified to the 'join table function', updated.
> > > >
> > > > Best,
> > > > Aitozi
> > > >
> > > > Lincoln Lee  于2023年6月15日周四 09:15写道:
> > > >
> > > > > Hi Aitozi,
> > > > >
> > > > > Thanks for your reply!  Gives sql us

Re: [VOTE] FLIP-313: Add support of User Defined AsyncTableFunction

2023-06-25 Thread Aitozi
Hi devs:
The last comments in [1] has been addressed, I'd like to restart this
vote thread.
The vote will be open for at least 72 hours (until June 29th, 10:00AM GMT)
unless there is an objection or an insufficient number of votes.

[1] https://lists.apache.org/thread/7vk1799ryvrz4lsm5254q64ctm89mx2l
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction

Best regards,
Aitozi

Aitozi  于2023年6月14日周三 09:47写道:

> Hi all,
> Thanks for all the feedback about FLIP-313: Add support of User
> Defined AsyncTableFunction[1]. Based on the discussion [2], we have come to
> a consensus, so I would like to start a vote.
> The vote will be open for at least 72 hours (until June 19th, 10:00AM GMT)
> unless there is an objection or an insufficient number of votes.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
> [2] https://lists.apache.org/thread/7vk1799ryvrz4lsm5254q64ctm89mx2l
>
> Best regards,
> Aitozi
>


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-25 Thread Aitozi
Hi Lincoln,
Thanks for your confirmation. I have updated the consensus to the FLIP
doc.
If there are no other comments, I'd like to restart the vote process in [1]
today.

https://lists.apache.org/thread/7g5n2vshosom2dj9bp7x4n01okrnx4xx

Thanks,
Aitozi.

Lincoln Lee  于2023年6月21日周三 22:29写道:

> Hi Aitozi,
>
> Thanks for your updates!
>
> By the design of hints, the hints after select clause belong to the query
> hints category, and this new hint is also a kind of join hints[1].
> Join table function is one of the join type defined by flink sql joins[2],
> all existing join hints[1] omit the 'join' keyword,
> so I would prefer the 'ASYNC_TABLE_FUNC' (which is actually the one for
> 'ASYNC_TABLE_FUNC_JOIN').
>
> Since a short Chinese holiday is coming, I suggest waiting for other
> people's responses before continuing to vote (next monday?)
>
> Btw, I discussed with @fudian offline about pyflink support, there should
> be no known issues, so you can create a subtask with pyflink support after
> the vote passed.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#join-hints
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/
>
> Best,
> Lincoln Lee
>
>
> Aitozi  于2023年6月18日周日 21:18写道:
>
> > Hi all,
> > Sorry for the late reply, I have a discussion with Lincoln offline,
> > mainly about
> > the naming of the hints option. Thanks Lincoln for the valuable
> > suggestions.
> >
> > Let me answer the last email inline.
> >
> > >For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call
> as
> > an example?
> >
> > Sure, will give an example when adding the doc of async udtf and will
> > update the FLIP simultaneously
> >
> > >For the name of this query hint, 'LATERAL' (include its internal
> options)
> > don't show any relevance to async, but I haven't thought of a suitable
> name
> > at the moment,
> >
> > After some discussion with Lincoln, We prefer to choose one of the
> > `ASYNC_TABLE_FUNC` and `ASYNC_LATERAL`.
> > Besides, In my opinion the keyword `lateral`'s use scenario is wider than
> > the table function join, but in this case we only want to config
> > the async table function, So I'm a bit more lean to the
> `ASYNC_TABLE_FUNC`.
> > Looking forward to some inputs if you guys have
> > some better suggestion on the naming.
> >
> > For the usage of the hints config option, I have updated the section
> > of ConfigOption, you can refer to the FLIP
> > for more details.
> >
> > >Also, the terms 'correlate join' and 'lateral join' are not the same as
> in
> > the current joins page[1], so maybe it would be better if we unified them
> > into  'join table function'
> >
> > Yes, we should unified to the 'join table function', updated.
> >
> > Best,
> > Aitozi
> >
> > Lincoln Lee  于2023年6月15日周四 09:15写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for your reply!  Gives sql users more flexibility to get
> > > asynchronous processing capabilities via lateral join table function +1
> > for
> > > this
> > >
> > > For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call
> > as
> > > an example?
> > >
> > > For the name of this query hint, 'LATERAL' (include its internal
> options)
> > > don't show any relevance to async, but I haven't thought of a suitable
> > name
> > > at the moment,
> > > maybe we need to highlight the async keyword directly, we can also see
> if
> > > others have better candidates
> > >
> > > For the hint option "timeout = '180s'" should be "'timeout' = '180s'",
> > > seems a typo in the flip. And use upper case for all keywords in sql
> > > examples.
> > > Also, the terms 'correlate join' and 'lateral join' are not the same as
> > in
> > > the current joins page[1], so maybe it would be better if we unified
> them
> > > into  'join table function'
> > >
> > > [1]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#table-function
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Aitozi  于2023年6月14日周三 16:11写道:
> > >

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-18 Thread Aitozi
Hi all,
Sorry for the late reply, I have a discussion with Lincoln offline,
mainly about
the naming of the hints option. Thanks Lincoln for the valuable suggestions.

Let me answer the last email inline.

>For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call as
an example?

Sure, will give an example when adding the doc of async udtf and will
update the FLIP simultaneously

>For the name of this query hint, 'LATERAL' (include its internal options)
don't show any relevance to async, but I haven't thought of a suitable name
at the moment,

After some discussion with Lincoln, We prefer to choose one of the
`ASYNC_TABLE_FUNC` and `ASYNC_LATERAL`.
Besides, In my opinion the keyword `lateral`'s use scenario is wider than
the table function join, but in this case we only want to config
the async table function, So I'm a bit more lean to the `ASYNC_TABLE_FUNC`.
Looking forward to some inputs if you guys have
some better suggestion on the naming.

For the usage of the hints config option, I have updated the section
of ConfigOption, you can refer to the FLIP
for more details.

>Also, the terms 'correlate join' and 'lateral join' are not the same as in
the current joins page[1], so maybe it would be better if we unified them
into  'join table function'

Yes, we should unified to the 'join table function', updated.

Best,
Aitozi

Lincoln Lee  于2023年6月15日周四 09:15写道:

> Hi Aitozi,
>
> Thanks for your reply!  Gives sql users more flexibility to get
> asynchronous processing capabilities via lateral join table function +1 for
> this
>
> For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call as
> an example?
>
> For the name of this query hint, 'LATERAL' (include its internal options)
> don't show any relevance to async, but I haven't thought of a suitable name
> at the moment,
> maybe we need to highlight the async keyword directly, we can also see if
> others have better candidates
>
> For the hint option "timeout = '180s'" should be "'timeout' = '180s'",
> seems a typo in the flip. And use upper case for all keywords in sql
> examples.
> Also, the terms 'correlate join' and 'lateral join' are not the same as in
> the current joins page[1], so maybe it would be better if we unified them
> into  'join table function'
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#table-function
>
> Best,
> Lincoln Lee
>
>
> Aitozi  于2023年6月14日周三 16:11写道:
>
> > Hi Lincoln
> >
> > Very thanks for your valuable question. I will try to answer your
> > questions inline.
> >
> > >Does the async udtf bring any additional benefits besides a
> > lighter implementation?
> >
> > IMO, async udtf is more than a lighter implementation. It can act as a
> > general way for sql users to use the async operator. And they don't have
> to
> > bind the async function with a table (a LookupTable), and they are not
> > forced to join on an equality join condition, and they can use it to do
> > more than enrich data.
> >
> > The async lookup join is more like a subset/specific usage of async udtf.
> > The specific version has more opportunity to be optimized (like push
> down)
> > is acceptable. Async table function should be categorized to used-defined
> > function.
> >
> > >Should users
> >
> > migrate to the lookup source when they encounter similar requirements or
> >
> > problems, or should we develop an additional set of similar mechanisms?
> >
> > As I clarified above, the lookup join is a specific usage of async udtf.
> So
> > it deserves more refined optimization like caching / retryable. But it
> may
> > not all
> >
> > suitable for the async udtf. As function, it can be deterministic/or
> > non-deterministic. So caching is not suitable, and we also do not have a
> > common cache for the udf now. So I think optimization like caching/retry
> > should be handed over to the function implementor.
> >
> > > the newly added query hint need a different name that
> > can be easier related to the lateral operation as the current join
> hints[5]
> > do.
> >
> >
> > What about using LATERAL?
> >
> > as below
> >
> > SELECT /*+ LATERAL('output-mode' = 'ordered', 'capacity' = '200',
> timeout =
> > '180s') */ a, c1, c2
> >
> > FROM T1
> >
> > LEFT JOIN lateral TABLE (async_split(b)) AS T(c1, c2) ON true
> >
> > >For

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Aitozi
Hi Lijie,

Nice to see this valuable feature. After reading the FLIP I have some
questions below:

>Schedule the TableSource(dim) first.

How does it know to schedule the TableSource(dim) first ? IMO, In the
current implementation two source table operators will be executed
simultaneously.

>If the data volume on the probe side is too small, the overhead of
building runtime filter is not worth it.

Are there some tests to show the default value of
table.optimizer.runtime-filter.min-probe-data-size 10G is a good default
value. The same to table.optimizer.runtime-filter.max-build-data-size

>the runtime filter can be pushed down along the probe side, as close to
data sources as possible

What's the representation of the runtime filter node in planner ? Is it a
Filternode

Best,

Aitozi.

Benchao Li  于2023年6月15日周四 14:30写道:

> Hi Lijie,
>
> Regarding the shuffle mode, I think it would be reasonable to also support
> "pipeline shuffle" if possible.
>
> "pipeline shuffle" is a essential for OLAP/MPP computing, although this has
> not been much exposed to users for now, I know a few companies that uses
> Flink as a MPP computing engine, and there is an ongoing effort[1] to make
> this usage more powerful.
>
> Back to your concern that "Even if the RuntimeFilter becomes running before
> the RuntimeFilterBuilder finished, it will not process any data and will
> occupy resources", whether it benefits us depends on the scale of data, if
> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter operator,
> it can still filter out additional data afterwards. Hence in my opinion, we
> do not need to make the edge between RuntimeFilterBuilder and RuntimeFilter
> BLOCKING only, at least it can be configured.
>
> [1] https://issues.apache.org/jira/browse/FLINK-25318
>
> Lijie Wang  于2023年6月15日周四 14:18写道:
>
> > Hi Yuxia,
> >
> > I made a mistake in the above response.
> >
> > The runtime filter can work well with all shuffle mode. However, hybrid
> > shuffle and blocking shuffle are currently recommended for batch jobs
> > (piepline shuffle is not recommended).
> >
> > One more thing to mention here is that we will force the edge between
> > RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of which
> > BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need
> to
> > run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
> > becomes running before the RuntimeFilterBuilder finished, it will not
> > process any data and will occupy resources.
> >
> > Best,
> > Lijie
> >
> > Lijie Wang  于2023年6月15日周四 09:48写道:
> >
> > > Hi Yuxia,
> > >
> > > Thanks for your feedback. The answers of your questions are as follows:
> > >
> > > 1. Yes, the row count comes from statistic of underlying table(Or
> > > estimated based on the statistic of underlying table, if the build side
> > or
> > > probe side is not TableScan).  If the statistic unavailable, we will
> not
> > > inject a runtime filter(As you said, we can hardly evaluate the
> > benefits).
> > > Besides, AFAIK, the estimated data size of build side is also based on
> > the
> > > row count statistics, that is, if the statistics is unavailable, the
> > > requirement "table.optimizer.runtime-filter.max-build-data-size" cannot
> > be
> > > evaluated either. I'll add this point into FLIP.
> > >
> > > 2.
> > > Estimated data size does not meet requirement (in planner optimization
> > > phase) -> No filter
> > > Estimated data size meets the requirement (in planner optimization
> > phase),
> > > but the real data size does not meet the requirement(in execution
> phase)
> > ->
> > > Fake filter
> > >
> > > 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
> > >
> > > Best,
> > > Lijie
> > >
> > > yuxia  于2023年6月14日周三 20:37写道:
> > >
> > >> Thanks Lijie for starting this discussion. Excited to see runtime
> filter
> > >> is to be implemented in Flink.
> > >> I have few questions about it:
> > >>
> > >> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> > >> instead`. So, does row count comes from the statistic from underlying
> > >> table? What if the the statistic is also unavailable considering users
> > >> maynot always remember to generate statistic in production.
> > >> I'm wondering whether it make senese that just disable runtime filter

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-14 Thread Aitozi
Hi Lincoln

Very thanks for your valuable question. I will try to answer your
questions inline.

>Does the async udtf bring any additional benefits besides a
lighter implementation?

IMO, async udtf is more than a lighter implementation. It can act as a
general way for sql users to use the async operator. And they don't have to
bind the async function with a table (a LookupTable), and they are not
forced to join on an equality join condition, and they can use it to do
more than enrich data.

The async lookup join is more like a subset/specific usage of async udtf.
The specific version has more opportunity to be optimized (like push down)
is acceptable. Async table function should be categorized to used-defined
function.

>Should users

migrate to the lookup source when they encounter similar requirements or

problems, or should we develop an additional set of similar mechanisms?

As I clarified above, the lookup join is a specific usage of async udtf. So
it deserves more refined optimization like caching / retryable. But it may
not all

suitable for the async udtf. As function, it can be deterministic/or
non-deterministic. So caching is not suitable, and we also do not have a
common cache for the udf now. So I think optimization like caching/retry
should be handed over to the function implementor.

> the newly added query hint need a different name that
can be easier related to the lateral operation as the current join hints[5]
do.


What about using LATERAL?

as below

SELECT /*+ LATERAL('output-mode' = 'ordered', 'capacity' = '200', timeout =
'180s') */ a, c1, c2

FROM T1

LEFT JOIN lateral TABLE (async_split(b)) AS T(c1, c2) ON true

>For the async func example, since the target scenario is an external io
operation, it's better to add the `close` method to actively release
resources as a good example for users


Make sense to me, will update the FLIP

Best,

Aitozi.

Lincoln Lee  于2023年6月14日周三 14:24写道:

> Hi Aitozi,
>
> Sorry for the lately reply here!  Supports async udtf(`AsyncTableFunction`)
> directly in sql seems like an attractive feature, but there're two issues
> that need to be addressed before we can be sure to add it:
> 1. As mentioned in the flip[1], the current lookup function can already
> implement the requirements, but it requires implementing an extra
> `LookupTableSource` and explicitly declaring the table schema (which can
> help implementers the various push-down optimizations supported by the
> planner). Does the async udtf bring any additional benefits besides a
> lighter implementation?
> 2. FLIP-221[2] abstracts a reusable cache and metric infrastructure for
> lookup sources, which are important to improve performance and
> observability for high overhead external io scenarios, how do we integrate
> and reuse these capabilities after introducing async udtf? Should users
> migrate to the lookup source when they encounter similar requirements or
> problems, or should we develop an additional set of similar mechanisms? (a
> similarly case:  FLIP-234[3] introduced the retryable capability for lookup
> join)
>
> For the flip itself,
> 1. Considering the 'options' is already used as the dynamic table
> options[4] in flink, the newly added query hint need a different name that
> can be easier related to the lateral operation as the current join hints[5]
> do.
> 2. For the async func example, since the target scenario is an external io
> operation, it's better to add the `close` method to actively release
> resources as a good example for users. Also in terms of the determinism of
> a function, it is important to remind users that unless the behavior of the
> function is deterministic, it needs to be explicitly declared as
> non-deterministic.
>
> [1].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction?src=contextnavpagetreemode
> [2].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221%3A+Abstraction+for+lookup+source+cache+and+metric?src=contextnavpagetreemode
> [3].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems?src=contextnavpagetreemode
> [4].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL?src=contextnavpagetreemode
> [5].
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job?src=contextnavpagetreemode
>
> Best,
> Lincoln Lee
>
>
> Aitozi  于2023年6月13日周二 11:30写道:
>
> > Get your meaning now, thanks :)
> >
> > Best,
> > Aitozi.
> >
> > Feng Jin  于2023年6月13日周二 11:16写道:
> >
> > 

[VOTE] FLIP-313: Add support of User Defined AsyncTableFunction

2023-06-13 Thread Aitozi
Hi all,
Thanks for all the feedback about FLIP-313: Add support of User Defined
AsyncTableFunction[1]. Based on the discussion [2], we have come to a
consensus, so I would like to start a vote.
The vote will be open for at least 72 hours (until June 19th, 10:00AM GMT)
unless there is an objection or an insufficient number of votes.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction
[2] https://lists.apache.org/thread/7vk1799ryvrz4lsm5254q64ctm89mx2l

Best regards,
Aitozi


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Aitozi
Get your meaning now, thanks :)

Best,
Aitozi.

Feng Jin  于2023年6月13日周二 11:16写道:

> Hi Aitozi,
>
> Sorry for the confusing description.
>
> What I meant was that if we need to remind users about tire safety issues,
> we should introduce the new UDTF interface instead of executing the
> original UDTF asynchronously. Therefore, I agree with introducing the
> AsyncTableFunction.
>
> Best,
> Feng
>
> On Tue, Jun 13, 2023 at 10:42 AM Aitozi  wrote:
>
> > Hi Feng,
> > Thanks for your question. We do not provide a way to switch the UDTF
> > between sync and async way,
> > So there should be no thread safety problem here.
> >
> > Best,
> > Aitozi
> >
> > Feng Jin  于2023年6月13日周二 10:31写道:
> >
> > > Hi Aitozi, We do need to remind users about thread safety issues. Thank
> > you
> > > for your efforts on this FLIP. I have no further questions.
> > > Best, Feng
> > >
> > >
> > > On Tue, Jun 13, 2023 at 6:05 AM Jing Ge 
> > > wrote:
> > >
> > > > Hi Aitozi,
> > > >
> > > > Thanks for taking care of that part. I have no other concern.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > >
> > > > On Mon, Jun 12, 2023 at 5:38 PM Aitozi  wrote:
> > > >
> > > > > BTW, If there are no other more blocking issue / comments, I would
> > like
> > > > to
> > > > > start a VOTE in another thread this wednesday 6.14
> > > > >
> > > > > Thanks,
> > > > > Aitozi.
> > > > >
> > > > > Aitozi  于2023年6月12日周一 23:34写道:
> > > > >
> > > > > > Hi, Jing,
> > > > > >     Thanks for your explanation. I get your point now.
> > > > > >
> > > > > > For the performance part, I think it's a good idea to run with
> > > > returning
> > > > > a
> > > > > > big table case, the memory consumption
> > > > > > should be a point to be taken care about. Because in the ordered
> > > mode,
> > > > > the
> > > > > > head element in buffer may affect the
> > > > > > total memory consumption.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Aitozi.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Jing Ge  于2023年6月12日周一 20:28写道:
> > > > > >
> > > > > >> Hi Aitozi,
> > > > > >>
> > > > > >> Which key will be used for lookup is not an issue, only one row
> > will
> > > > be
> > > > > >> required for each key in order to enrich it. True, it depends on
> > the
> > > > > >> implementation whether multiple rows or single row for each key
> > will
> > > > be
> > > > > >> returned. However, for the lookup & enrichment scenario, one
> > row/key
> > > > is
> > > > > >> recommended, otherwise, like I mentioned previously, enrichment
> > > won't
> > > > > >> work.
> > > > > >>
> > > > > >> I am a little bit concerned about returning a big table for each
> > > key,
> > > > > >> since
> > > > > >> it will take the async call longer to return and need more
> memory.
> > > The
> > > > > >> performance tests should cover this scenario. This is not a
> > blocking
> > > > > issue
> > > > > >> for this FLIP.
> > > > > >>
> > > > > >> Best regards,
> > > > > >> Jing
> > > > > >>
> > > > > >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi 
> > > wrote:
> > > > > >>
> > > > > >> > Hi Jing,
> > > > > >> > I means the join key is not necessary to be the primary
> key
> > or
> > > > > >> unique
> > > > > >> > index of the database.
> > > > > >> > In this situation, we may queried out multi rows for one join
> > > key. I
> > > > > >> think
> > > > > >> > that's why the
> > > > > >> > LookupFunction#lookup will return a collection of RowData.
> &g

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Aitozi
Hi Feng,
Thanks for your question. We do not provide a way to switch the UDTF
between sync and async way,
So there should be no thread safety problem here.

Best,
Aitozi

Feng Jin  于2023年6月13日周二 10:31写道:

> Hi Aitozi, We do need to remind users about thread safety issues. Thank you
> for your efforts on this FLIP. I have no further questions.
> Best, Feng
>
>
> On Tue, Jun 13, 2023 at 6:05 AM Jing Ge 
> wrote:
>
> > Hi Aitozi,
> >
> > Thanks for taking care of that part. I have no other concern.
> >
> > Best regards,
> > Jing
> >
> >
> > On Mon, Jun 12, 2023 at 5:38 PM Aitozi  wrote:
> >
> > > BTW, If there are no other more blocking issue / comments, I would like
> > to
> > > start a VOTE in another thread this wednesday 6.14
> > >
> > > Thanks,
> > > Aitozi.
> > >
> > > Aitozi  于2023年6月12日周一 23:34写道:
> > >
> > > > Hi, Jing,
> > > > Thanks for your explanation. I get your point now.
> > > >
> > > > For the performance part, I think it's a good idea to run with
> > returning
> > > a
> > > > big table case, the memory consumption
> > > > should be a point to be taken care about. Because in the ordered
> mode,
> > > the
> > > > head element in buffer may affect the
> > > > total memory consumption.
> > > >
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > >
> > > >
> > > > Jing Ge  于2023年6月12日周一 20:28写道:
> > > >
> > > >> Hi Aitozi,
> > > >>
> > > >> Which key will be used for lookup is not an issue, only one row will
> > be
> > > >> required for each key in order to enrich it. True, it depends on the
> > > >> implementation whether multiple rows or single row for each key will
> > be
> > > >> returned. However, for the lookup & enrichment scenario, one row/key
> > is
> > > >> recommended, otherwise, like I mentioned previously, enrichment
> won't
> > > >> work.
> > > >>
> > > >> I am a little bit concerned about returning a big table for each
> key,
> > > >> since
> > > >> it will take the async call longer to return and need more memory.
> The
> > > >> performance tests should cover this scenario. This is not a blocking
> > > issue
> > > >> for this FLIP.
> > > >>
> > > >> Best regards,
> > > >> Jing
> > > >>
> > > >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi 
> wrote:
> > > >>
> > > >> > Hi Jing,
> > > >> > I means the join key is not necessary to be the primary key or
> > > >> unique
> > > >> > index of the database.
> > > >> > In this situation, we may queried out multi rows for one join
> key. I
> > > >> think
> > > >> > that's why the
> > > >> > LookupFunction#lookup will return a collection of RowData.
> > > >> >
> > > >> > BTW, I think the behavior of lookup join will not affect the
> > semantic
> > > of
> > > >> > the async udtf.
> > > >> > We use the Async TableFunction here and the table function can
> > collect
> > > >> > multiple rows.
> > > >> >
> > > >> > Thanks,
> > > >> > Atiozi.
> > > >> >
> > > >> >
> > > >> >
> > > >> > Jing Ge  于2023年6月10日周六 00:15写道:
> > > >> >
> > > >> > > Hi Aitozi,
> > > >> > >
> > > >> > > The keyRow used in this case contains all keys[1].
> > > >> > >
> > > >> > > Best regards,
> > > >> > > Jing
> > > >> > >
> > > >> > > [1]
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > >> > >
> > > >> > >
> > > >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi 
> > wrote:
> > > >> > >
> > > >> &

[jira] [Created] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Aitozi (Jira)
Aitozi created FLINK-32320:
--

 Summary: Same correlate can not be reused due to the different 
correlationId
 Key: FLINK-32320
 URL: https://issues.apache.org/jira/browse/FLINK-32320
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


As describe in SubplanReuserTest


{code:java}
  @Test
  def testSubplanReuseOnCorrelate(): Unit = {
util.addFunction("str_split", new StringSplit())
val sqlQuery =
  """
|WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) 
AS T(v))
|SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
  """.stripMargin
// TODO the sub-plan of Correlate should be reused,
// however the digests of Correlates are different
util.verifyExecPlan(sqlQuery)
  }
{code}

This will produce the plan 


{code:java}
HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0, 
c0, f00], build=[right])
:- Exchange(distribution=[hash[f0]])
:  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f0]])
   +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
  +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}

The Correlate node can not be reused due to the different correlation id.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Aitozi
BTW, If there are no other more blocking issue / comments, I would like to
start a VOTE in another thread this wednesday 6.14

Thanks,
Aitozi.

Aitozi  于2023年6月12日周一 23:34写道:

> Hi, Jing,
> Thanks for your explanation. I get your point now.
>
> For the performance part, I think it's a good idea to run with returning a
> big table case, the memory consumption
> should be a point to be taken care about. Because in the ordered mode, the
> head element in buffer may affect the
> total memory consumption.
>
>
> Thanks,
> Aitozi.
>
>
>
> Jing Ge  于2023年6月12日周一 20:28写道:
>
>> Hi Aitozi,
>>
>> Which key will be used for lookup is not an issue, only one row will be
>> required for each key in order to enrich it. True, it depends on the
>> implementation whether multiple rows or single row for each key will be
>> returned. However, for the lookup & enrichment scenario, one row/key is
>> recommended, otherwise, like I mentioned previously, enrichment won't
>> work.
>>
>> I am a little bit concerned about returning a big table for each key,
>> since
>> it will take the async call longer to return and need more memory. The
>> performance tests should cover this scenario. This is not a blocking issue
>> for this FLIP.
>>
>> Best regards,
>> Jing
>>
>> On Sat, Jun 10, 2023 at 4:11 AM Aitozi  wrote:
>>
>> > Hi Jing,
>> > I means the join key is not necessary to be the primary key or
>> unique
>> > index of the database.
>> > In this situation, we may queried out multi rows for one join key. I
>> think
>> > that's why the
>> > LookupFunction#lookup will return a collection of RowData.
>> >
>> > BTW, I think the behavior of lookup join will not affect the semantic of
>> > the async udtf.
>> > We use the Async TableFunction here and the table function can collect
>> > multiple rows.
>> >
>> > Thanks,
>> > Atiozi.
>> >
>> >
>> >
>> > Jing Ge  于2023年6月10日周六 00:15写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > The keyRow used in this case contains all keys[1].
>> > >
>> > > Best regards,
>> > > Jing
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
>> > >
>> > >
>> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:
>> > >
>> > > > Hi Jing,
>> > > >
>> > > >  The performance test is added to the FLIP.
>> > > >
>> > > >  As I know, The lookup join can return multi rows, it depends on
>> > > > whether  the join key
>> > > > is the primary key of the external database or not. The `lookup` [1]
>> > will
>> > > > return a collection of
>> > > > joined result, and each of them will be collected
>> > > >
>> > > >
>> > > > [1]:
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
>> > > >
>> > > >
>> > > > Thanks,
>> > > > Aitozi.
>> > > >
>> > > > Jing Ge  于2023年6月9日周五 17:05写道:
>> > > >
>> > > > > Hi Aitozi,
>> > > > >
>> > > > > Thanks for the feedback. Looking forward to the performance tests.
>> > > > >
>> > > > > Afaik, lookup returns one row for each key [1] [2]. Conceptually,
>> the
>> > > > > lookup function is used to enrich column(s) from the dimension
>> table.
>> > > If,
>> > > > > for the given key, there will be more than one row, there will be
>> no
>> > > way
>> > > > to
>> > > > > know which row will be used to enrich the key.
>> > > > >
>> > > > > [1]
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
>> 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Aitozi
Hi, Jing,
Thanks for your explanation. I get your point now.

For the performance part, I think it's a good idea to run with returning a
big table case, the memory consumption
should be a point to be taken care about. Because in the ordered mode, the
head element in buffer may affect the
total memory consumption.


Thanks,
Aitozi.



Jing Ge  于2023年6月12日周一 20:28写道:

> Hi Aitozi,
>
> Which key will be used for lookup is not an issue, only one row will be
> required for each key in order to enrich it. True, it depends on the
> implementation whether multiple rows or single row for each key will be
> returned. However, for the lookup & enrichment scenario, one row/key is
> recommended, otherwise, like I mentioned previously, enrichment won't work.
>
> I am a little bit concerned about returning a big table for each key, since
> it will take the async call longer to return and need more memory. The
> performance tests should cover this scenario. This is not a blocking issue
> for this FLIP.
>
> Best regards,
> Jing
>
> On Sat, Jun 10, 2023 at 4:11 AM Aitozi  wrote:
>
> > Hi Jing,
> > I means the join key is not necessary to be the primary key or unique
> > index of the database.
> > In this situation, we may queried out multi rows for one join key. I
> think
> > that's why the
> > LookupFunction#lookup will return a collection of RowData.
> >
> > BTW, I think the behavior of lookup join will not affect the semantic of
> > the async udtf.
> > We use the Async TableFunction here and the table function can collect
> > multiple rows.
> >
> > Thanks,
> > Atiozi.
> >
> >
> >
> > Jing Ge  于2023年6月10日周六 00:15写道:
> >
> > > Hi Aitozi,
> > >
> > > The keyRow used in this case contains all keys[1].
> > >
> > > Best regards,
> > > Jing
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > >
> > >
> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:
> > >
> > > > Hi Jing,
> > > >
> > > >  The performance test is added to the FLIP.
> > > >
> > > >  As I know, The lookup join can return multi rows, it depends on
> > > > whether  the join key
> > > > is the primary key of the external database or not. The `lookup` [1]
> > will
> > > > return a collection of
> > > > joined result, and each of them will be collected
> > > >
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> > > >
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > > Jing Ge  于2023年6月9日周五 17:05写道:
> > > >
> > > > > Hi Aitozi,
> > > > >
> > > > > Thanks for the feedback. Looking forward to the performance tests.
> > > > >
> > > > > Afaik, lookup returns one row for each key [1] [2]. Conceptually,
> the
> > > > > lookup function is used to enrich column(s) from the dimension
> table.
> > > If,
> > > > > for the given key, there will be more than one row, there will be
> no
> > > way
> > > > to
> > > > > know which row will be used to enrich the key.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi 
> wrote:
> > > > >
> > > > > > Hi Jing
> > > > > > Thanks for your good questions. I have updated the example to
> > the
> > > > >

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-09 Thread Aitozi
Hi Jing,
I means the join key is not necessary to be the primary key or unique
index of the database.
In this situation, we may queried out multi rows for one join key. I think
that's why the
LookupFunction#lookup will return a collection of RowData.

BTW, I think the behavior of lookup join will not affect the semantic of
the async udtf.
We use the Async TableFunction here and the table function can collect
multiple rows.

Thanks,
Atiozi.



Jing Ge  于2023年6月10日周六 00:15写道:

> Hi Aitozi,
>
> The keyRow used in this case contains all keys[1].
>
> Best regards,
> Jing
>
> [1]
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
>
>
> On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:
>
> > Hi Jing,
> >
> >  The performance test is added to the FLIP.
> >
> >  As I know, The lookup join can return multi rows, it depends on
> > whether  the join key
> > is the primary key of the external database or not. The `lookup` [1] will
> > return a collection of
> > joined result, and each of them will be collected
> >
> >
> > [1]:
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> >
> >
> > Thanks,
> > Aitozi.
> >
> > Jing Ge  于2023年6月9日周五 17:05写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for the feedback. Looking forward to the performance tests.
> > >
> > > Afaik, lookup returns one row for each key [1] [2]. Conceptually, the
> > > lookup function is used to enrich column(s) from the dimension table.
> If,
> > > for the given key, there will be more than one row, there will be no
> way
> > to
> > > know which row will be used to enrich the key.
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > [2]
> > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi  wrote:
> > >
> > > > Hi Jing
> > > > Thanks for your good questions. I have updated the example to the
> > > FLIP.
> > > >
> > > > > Only one row for each lookup
> > > > lookup can also return multi rows, based on the query result. [1]
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> > > >
> > > > > If we use async calls with lateral join, my gut feeling is
> > > > that we might have many more async calls than lookup join. I am not
> > > really
> > > > sure if we will be facing potential issues in this case or not.
> > > >
> > > > IMO, the work pattern is similar to the lookup function, for each row
> > > from
> > > > the left table,
> > > > it will evaluate the eval method once, so the async call numbers will
> > not
> > > > change.
> > > > and the maximum calls in flight is limited by the Async operators
> > buffer
> > > > capacity
> > > > which will be controlled by the option.
> > > >
> > > > BTW, for the naming of these option, I updated the FLIP about this
> you
> > > can
> > > > refer to
> > > > the section of "ConfigOption" and "Rejected Alternatives"
> > > >
> > > > In the end, for the performance evaluation, I'd like to do some tests
> > and
> > > > will update it to the FLIP doc
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > >
> > > > Jing Ge  于2023年6月9日周五 07:23写道:
> > > >
> > > > > Hi Aitozi,
> > > > >
> > > > > Thanks for the clarification. The code example looks interesting. I
> > > would
> > > > > suggest adding them int

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-09 Thread Aitozi
Hi Jing,

 The performance test is added to the FLIP.

 As I know, The lookup join can return multi rows, it depends on
whether  the join key
is the primary key of the external database or not. The `lookup` [1] will
return a collection of
joined result, and each of them will be collected


[1]:
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52


Thanks,
Aitozi.

Jing Ge  于2023年6月9日周五 17:05写道:

> Hi Aitozi,
>
> Thanks for the feedback. Looking forward to the performance tests.
>
> Afaik, lookup returns one row for each key [1] [2]. Conceptually, the
> lookup function is used to enrich column(s) from the dimension table. If,
> for the given key, there will be more than one row, there will be no way to
> know which row will be used to enrich the key.
>
> [1]
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> [2]
>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
>
> Best regards,
> Jing
>
> On Fri, Jun 9, 2023 at 5:18 AM Aitozi  wrote:
>
> > Hi Jing
> > Thanks for your good questions. I have updated the example to the
> FLIP.
> >
> > > Only one row for each lookup
> > lookup can also return multi rows, based on the query result. [1]
> >
> > [1]:
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> >
> > > If we use async calls with lateral join, my gut feeling is
> > that we might have many more async calls than lookup join. I am not
> really
> > sure if we will be facing potential issues in this case or not.
> >
> > IMO, the work pattern is similar to the lookup function, for each row
> from
> > the left table,
> > it will evaluate the eval method once, so the async call numbers will not
> > change.
> > and the maximum calls in flight is limited by the Async operators buffer
> > capacity
> > which will be controlled by the option.
> >
> > BTW, for the naming of these option, I updated the FLIP about this you
> can
> > refer to
> > the section of "ConfigOption" and "Rejected Alternatives"
> >
> > In the end, for the performance evaluation, I'd like to do some tests and
> > will update it to the FLIP doc
> >
> > Thanks,
> > Aitozi.
> >
> >
> > Jing Ge  于2023年6月9日周五 07:23写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for the clarification. The code example looks interesting. I
> would
> > > suggest adding them into the FLIP. The description with code examples
> > will
> > > help readers understand the motivation and how to use it. Afaiac, it
> is a
> > > valid feature for Flink users.
> > >
> > > As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME
> > AS
> > > OF which is also used in your code example. Temporal join performs the
> > > lookup based on the processing time match. Only one row for each
> > > lookup(afaiu, I need to check the source code to double confirm) will
> > > return for further enrichment. One the other hand, lateral join will
> have
> > > sub-queries correlated with every individual value of the reference
> table
> > > from the preceding part of the query and each sub query will return
> > > multiple rows. If we use async calls with lateral join, my gut feeling
> is
> > > that we might have many more async calls than lookup join. I am not
> > really
> > > sure if we will be facing potential issues in this case or not.
> Possible
> > > issues I can think of now e.g. too many PRC calls, too many async calls
> > > processing, the sub query will return a table which might be (too) big,
> > and
> > > might cause performance issues. I would suggest preparing some use
> cases
> > > and running some performance tests to check it. These are my concerns
> > about
> > > using async calls with lateral join and I'd like to share with you,
> happy
> > > to discuss with you and hear different opinions, hopefully the
> > > discussion could help me understand it more deeply. Please correct me
> if
> > I
> > > am wrong.
> > >
> > > B

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-08 Thread Aitozi
Hi Feng,
Thanks for your good question, It's very attractive if we can support
run the original
UDTF asynchronously without introducing new UDTFs.

But I think it's not easy, because the original UDTFs are executed one
instance per parallelism
So there is no thread-safe problem to user. But for the asynchronously
version, usually means
the main process is running in a dedicated thread pool. So if we run the
originally UDTFs in
asynchronously way, it may bring thread-safe problem.


@Jing Ge  Also add the "Performance" section in FLIP.

Thanks,
Aitozi.

Feng Jin  于2023年6月9日周五 13:16写道:

> hi, Aitozi
>
> Thank you for your proposal.
>
> In our production environment, we often encounter efficiency issues with
> user-defined functions (UDFs), which can lead to slower processing speeds.
> I believe that this FLIP will make it easier for UDFs to be executed more
> efficiently.
>
>
> I have a small question:
>
> Is it possible for us to execute the original UDTF asynchronously without
> introducing new UDTFs?
> Of course, this is just my personal idea and I am not sure if it is a
> feasible solution.
>
>
> Best,
> Feng
>
>
>
> On Fri, Jun 9, 2023 at 11:18 AM Aitozi  wrote:
>
> > Hi Jing
> > Thanks for your good questions. I have updated the example to the
> FLIP.
> >
> > > Only one row for each lookup
> > lookup can also return multi rows, based on the query result. [1]
> >
> > [1]:
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> >
> > > If we use async calls with lateral join, my gut feeling is
> > that we might have many more async calls than lookup join. I am not
> really
> > sure if we will be facing potential issues in this case or not.
> >
> > IMO, the work pattern is similar to the lookup function, for each row
> from
> > the left table,
> > it will evaluate the eval method once, so the async call numbers will not
> > change.
> > and the maximum calls in flight is limited by the Async operators buffer
> > capacity
> > which will be controlled by the option.
> >
> > BTW, for the naming of these option, I updated the FLIP about this you
> can
> > refer to
> > the section of "ConfigOption" and "Rejected Alternatives"
> >
> > In the end, for the performance evaluation, I'd like to do some tests and
> > will update it to the FLIP doc
> >
> > Thanks,
> > Aitozi.
> >
> >
> > Jing Ge  于2023年6月9日周五 07:23写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for the clarification. The code example looks interesting. I
> would
> > > suggest adding them into the FLIP. The description with code examples
> > will
> > > help readers understand the motivation and how to use it. Afaiac, it
> is a
> > > valid feature for Flink users.
> > >
> > > As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME
> > AS
> > > OF which is also used in your code example. Temporal join performs the
> > > lookup based on the processing time match. Only one row for each
> > > lookup(afaiu, I need to check the source code to double confirm) will
> > > return for further enrichment. One the other hand, lateral join will
> have
> > > sub-queries correlated with every individual value of the reference
> table
> > > from the preceding part of the query and each sub query will return
> > > multiple rows. If we use async calls with lateral join, my gut feeling
> is
> > > that we might have many more async calls than lookup join. I am not
> > really
> > > sure if we will be facing potential issues in this case or not.
> Possible
> > > issues I can think of now e.g. too many PRC calls, too many async calls
> > > processing, the sub query will return a table which might be (too) big,
> > and
> > > might cause performance issues. I would suggest preparing some use
> cases
> > > and running some performance tests to check it. These are my concerns
> > about
> > > using async calls with lateral join and I'd like to share with you,
> happy
> > > to discuss with you and hear different opinions, hopefully the
> > > discussion could help me understand it more deeply. Please correct me
> if
> > I
> > > am wrong.
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Thu, Jun 8, 2023 at 7:22 AM Aitozi  wrote:
> 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-08 Thread Aitozi
Hi Jing
Thanks for your good questions. I have updated the example to the FLIP.

> Only one row for each lookup
lookup can also return multi rows, based on the query result. [1]

[1]:
https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56

> If we use async calls with lateral join, my gut feeling is
that we might have many more async calls than lookup join. I am not really
sure if we will be facing potential issues in this case or not.

IMO, the work pattern is similar to the lookup function, for each row from
the left table,
it will evaluate the eval method once, so the async call numbers will not
change.
and the maximum calls in flight is limited by the Async operators buffer
capacity
which will be controlled by the option.

BTW, for the naming of these option, I updated the FLIP about this you can
refer to
the section of "ConfigOption" and "Rejected Alternatives"

In the end, for the performance evaluation, I'd like to do some tests and
will update it to the FLIP doc

Thanks,
Aitozi.


Jing Ge  于2023年6月9日周五 07:23写道:

> Hi Aitozi,
>
> Thanks for the clarification. The code example looks interesting. I would
> suggest adding them into the FLIP. The description with code examples will
> help readers understand the motivation and how to use it. Afaiac, it is a
> valid feature for Flink users.
>
> As we knew, lookup join is based on temporal join, i.e. FOR SYSTEM_TIME AS
> OF which is also used in your code example. Temporal join performs the
> lookup based on the processing time match. Only one row for each
> lookup(afaiu, I need to check the source code to double confirm) will
> return for further enrichment. One the other hand, lateral join will have
> sub-queries correlated with every individual value of the reference table
> from the preceding part of the query and each sub query will return
> multiple rows. If we use async calls with lateral join, my gut feeling is
> that we might have many more async calls than lookup join. I am not really
> sure if we will be facing potential issues in this case or not. Possible
> issues I can think of now e.g. too many PRC calls, too many async calls
> processing, the sub query will return a table which might be (too) big, and
> might cause performance issues. I would suggest preparing some use cases
> and running some performance tests to check it. These are my concerns about
> using async calls with lateral join and I'd like to share with you, happy
> to discuss with you and hear different opinions, hopefully the
> discussion could help me understand it more deeply. Please correct me if I
> am wrong.
>
> Best regards,
> Jing
>
>
> On Thu, Jun 8, 2023 at 7:22 AM Aitozi  wrote:
>
> > Hi Mason,
> > Thanks for your input. I think if we support the user defined async
> > table function,
> > user will be able to use it to hold a batch data then handle it at one
> time
> > in the customized function.
> >
> > AsyncSink is meant for the sink operator. I have not figure out how to
> > integrate in this case.
> >
> > Thanks,
> > Atiozi.
> >
> >
> > Mason Chen  于2023年6月8日周四 12:40写道:
> >
> > > Hi Aitozi,
> > >
> > > I think it makes sense to make it easier for SQL users to make RPCs. Do
> > you
> > > think your proposal can extend to the ability to batch data for the
> RPC?
> > > This is also another common strategy to increase throughput. Also, have
> > you
> > > considered solving this a bit differently by leveraging Flink's
> > AsyncSink?
> > >
> > > Best,
> > > Mason
> > >
> > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi  wrote:
> > >
> > > > One more thing for discussion:
> > > >
> > > > In our internal implementation, we reuse the option
> > > > `table.exec.async-lookup.buffer-capacity` and
> > > > `table.exec.async-lookup.timeout` to config
> > > > the async udtf. Do you think we should add two extra option to
> > > distinguish
> > > > from the lookup option such as
> > > >
> > > > `table.exec.async-udtf.buffer-capacity`
> > > > `table.exec.async-udtf.timeout`
> > > >
> > > >
> > > > Best,
> > > > Aitozi.
> > > >
> > > >
> > > >
> > > > Aitozi  于2023年6月5日周一 12:20写道:
> > > >
> > > > > Hi Jing,
> > > > >
> > > > > > what is the difference between the RPC call or query you
> > > mentioned
> > &g

Re: [VOTE] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-06-07 Thread Aitozi
+1

Looking forward to this feature.

Best,
Aitozi.

Jing Ge  于2023年6月8日周四 04:44写道:

> +1
>
> Best Regards,
> Jing
>
> On Wed, Jun 7, 2023 at 10:52 AM weijie guo 
> wrote:
>
> > +1 (binding)
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Jingsong Li  于2023年6月7日周三 15:59写道:
> >
> > > +1
> > >
> > > On Wed, Jun 7, 2023 at 3:03 PM Benchao Li 
> wrote:
> > > >
> > > > +1, binding
> > > >
> > > > Jark Wu  于2023年6月7日周三 14:44写道:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > > 2023年6月7日 14:20,liu ron  写道:
> > > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > Thanks for all the feedback about FLIP-315: Support Operator
> Fusion
> > > > > Codegen
> > > > > > for Flink SQL[1].
> > > > > > [2] is the discussion thread.
> > > > > >
> > > > > > I'd like to start a vote for it. The vote will be open for at
> least
> > > 72
> > > > > > hours (until June 12th, 12:00AM GMT) unless there is an objection
> > or
> > > an
> > > > > > insufficient number of votes.
> > > > > >
> > > > > > [1]:
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL
> > > > > > [2]:
> > > https://lists.apache.org/thread/9cnqhsld4nzdr77s2fwf00o9cb2g9fmw
> > > > > >
> > > > > > Best,
> > > > > > Ron
> > > > >
> > > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > >
> >
>


Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-06-07 Thread Aitozi
Hi ron,
Thanks you for your explanation, I have no other questions.

Best,
Atiozi.



liu ron  于2023年6月8日周四 10:52写道:

> Hi, Atiozi
>
> Thanks for your feedback.
>
> > Traverse the ExecNode DAG and create a FusionExecNode  for physical
> operators that can be fused together.
> which kind of operators can be fused together ? are the operators in an
> operator chain? Is this optimization aligned to spark's whole stage codegen
> ?
> In theory, all kinds of operators can be fused together, our final goal is
> to support all operators in batch mode, OperatorChain is just one case. Due
> to this work effort is relatively large, so we need to complete it step by
> step. Our OFCG not only achieves the ability of spark's whole stage
> codegen, but also do more better than them.
>
> > does the "support codegen" means fusion codegen? but why we generate a
> FusionTransformation when the member operator does not support codegen, IMO
> it should
> fallback to the current behavior.
>
> yes, it means the fusion codegen. In FLIP, I propose two operator fusion
> mechanisms, one is like OperatorChain for single input operator, another is
> MultipleInput fusion. For the former, our design mechanism is to fuse all
> operators together at the ExecNode layer only if they all support fusion
> codegen, or else go over the default OperatorChain. For the latter, in
> order not to break the existing MultipleInput optimization purpose, so when
> there are member operators that do not support fusion codegen,  we will
> fall back to the current behavior[1], which means that a
> FusionTransformation is created. here FusionTransformation is just a
> surrogate for MultipleInput case, it actually means
> MultipleInputTransformation, which fuses multiple physical operators.
> Sorry, the description in the flow is not very clear and caused your
> confusion.
>
> > In the end, I share the same idea with Lincoln about performance
> benchmark.
> Currently flink community's flink-benchmark only covers like schedule,
> state, datastream operator's performance.
> A good benchmark harness for sql operator will benefit the sql optimizer
> topic and observation
>
> For the performance benchmark, I agree with you. As I stated earlier, I
> think this is a new scope of work, we should design it separately, we can
> introduce this improvement in the future.
>
> [1]
>
> https://github.com/apache/flink/blob/77214f138cf759a3ee5466c9b2379e717227a0ae/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.java#L123
>
> Best,
> Ron
>
> Jing Ge  于2023年6月8日周四 04:28写道:
>
> > Hi Ron,
> >
> > Thanks for raising the proposal. It is a very attractive idea! Since the
> > FLIP is a relatively complex one which contains three papers and a design
> > doc. It deserves more time for the discussion to make sure everyone is on
> > the same page. I have a NIT question which will not block your voting
> > process. Previously, it took the community a lot of effort to make Flink
> > kinds of scala free. Since the code base of the table module is too big,
> > instead of porting to Java, all scala code has been hidden. Furthermore,
> > there are ongoing efforts to remove Scala code from Flink. As you can
> see,
> > the community tries to limit (i.e. get rid of) scala code as much as
> > possible. I was wondering if it is possible for you to implement the FLIP
> > with scala free code?
> >
> > Best regards,
> > Jing
> >
> > [1] https://flink.apache.org/2022/02/22/scala-free-in-one-fifteen/
> >
> > On Wed, Jun 7, 2023 at 5:33 PM Aitozi  wrote:
> >
> > > Hi Ron:
> > > Sorry for the late reply after the voting process. I just want to
> ask
> > >
> > > > Traverse the ExecNode DAG and create a FusionExecNode  for physical
> > > operators that can be fused together.
> > > which kind of operators can be fused together ? are the operators in an
> > > operator chain? Is this optimization aligned to spark's whole stage
> > codegen
> > > ?
> > >
> > > > If any member operator does not support codegen, generate a
> > > Transformation DAG based on the topological relationship of member
> > ExecNode
> > >  and jump to step 8.
> > > step8: Generate a FusionTransformation, setting the parallelism and
> > managed
> > > memory for the fused operator.
> > >
> > > does the "support codegen" means fusion codegen? but why we generate a
> > > FusionTransformation when the member operator does not support codegen,
>

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-07 Thread Aitozi
Hi Mason,
Thanks for your input. I think if we support the user defined async
table function,
user will be able to use it to hold a batch data then handle it at one time
in the customized function.

AsyncSink is meant for the sink operator. I have not figure out how to
integrate in this case.

Thanks,
Atiozi.


Mason Chen  于2023年6月8日周四 12:40写道:

> Hi Aitozi,
>
> I think it makes sense to make it easier for SQL users to make RPCs. Do you
> think your proposal can extend to the ability to batch data for the RPC?
> This is also another common strategy to increase throughput. Also, have you
> considered solving this a bit differently by leveraging Flink's AsyncSink?
>
> Best,
> Mason
>
> On Mon, Jun 5, 2023 at 1:50 AM Aitozi  wrote:
>
> > One more thing for discussion:
> >
> > In our internal implementation, we reuse the option
> > `table.exec.async-lookup.buffer-capacity` and
> > `table.exec.async-lookup.timeout` to config
> > the async udtf. Do you think we should add two extra option to
> distinguish
> > from the lookup option such as
> >
> > `table.exec.async-udtf.buffer-capacity`
> > `table.exec.async-udtf.timeout`
> >
> >
> > Best,
> > Aitozi.
> >
> >
> >
> > Aitozi  于2023年6月5日周一 12:20写道:
> >
> > > Hi Jing,
> > >
> > > > what is the difference between the RPC call or query you
> mentioned
> > > and the lookup in a very
> > > general way
> > >
> > > I think the RPC call or query service is quite similar to the lookup
> > join.
> > > But lookup join should work
> > > with `LookupTableSource`.
> > >
> > > Let's see how we can perform an async RPC call with lookup join:
> > >
> > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > (2) Implement a `LookupTableSource` connector run with the async udtf
> > > defined in (1).
> > > (3) Then define a DDL of this look up table in SQL
> > >
> > > CREATE TEMPORARY TABLE Customers (
> > >   id INT,
> > >   name STRING,
> > >   country STRING,
> > >   zip STRING
> > > ) WITH (
> > >   'connector' = 'custom'
> > > );
> > >
> > > (4) Run with the query as below:
> > >
> > > SELECT o.order_id, o.total, c.country, c.zip
> > > FROM Orders AS o
> > >   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> > > ON o.customer_id = c.id;
> > >
> > > This example is from doc
> > > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join
> > >.You
> > > can image the look up process as an async RPC call process.
> > >
> > > Let's see how we can perform an async RPC call with lateral join:
> > >
> > > (1) Implement an AsyncTableFunction with RPC call logic.
> > > (2) Run query with
> > >
> > > Create function f1 as '...' ;
> > >
> > > SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral table
> > > (f1(order_id)) as T(...);
> > >
> > > As you can see, the lateral join version is more simple and intuitive
> to
> > > users. Users do not have to wrap a
> > > LookupTableSource for the purpose of using async udtf.
> > >
> > > In the end, We can also see the user defined async table function is an
> > > enhancement of the current lateral table join
> > > which only supports sync lateral join now.
> > >
> > > Best,
> > > Aitozi.
> > >
> > >
> > > Jing Ge  于2023年6月2日周五 19:37写道:
> > >
> > >> Hi Aitozi,
> > >>
> > >> Thanks for the update. Just out of curiosity, what is the difference
> > >> between the RPC call or query you mentioned and the lookup in a very
> > >> general way? Since Lateral join is used in the FLIP. Is there any
> > special
> > >> thought for that? Sorry for asking so many questions. The FLIP
> contains
> > >> limited information to understand the motivation.
> > >>
> > >> Best regards,
> > >> Jing
> > >>
> > >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi  wrote:
> > >>
> > >> > Hi Jing,
> > >> > I have updated the proposed changes to the FLIP. IMO, lookup has
> > its
> > >> > clear
> > >> > async call requirement is due to its IO heavy operator. In our
> usage,
> > >>

Re: [DISCUSS] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-06-07 Thread Aitozi
Hi Ron:
Sorry for the late reply after the voting process. I just want to ask

> Traverse the ExecNode DAG and create a FusionExecNode  for physical
operators that can be fused together.
which kind of operators can be fused together ? are the operators in an
operator chain? Is this optimization aligned to spark's whole stage codegen
?

> If any member operator does not support codegen, generate a
Transformation DAG based on the topological relationship of member ExecNode
 and jump to step 8.
step8: Generate a FusionTransformation, setting the parallelism and managed
memory for the fused operator.

does the "support codegen" means fusion codegen? but why we generate a
FusionTransformation when the member operator does not support codegen, IMO
it should
fallback to the current behavior.

In the end, I share the same idea with Lincoln about performance benchmark.
Currently flink community's flink-benchmark only covers like schedule,
state, datastream operator's performance.
A good benchmark harness for sql operator will benefit the sql optimizer
topic and observation

Thanks,
Atiozi.


liu ron  于2023年6月6日周二 19:30写道:

> Hi dev
>
> Thanks for all the feedback, it seems that here are no more comments, I
> will
> start a vote on FLIP-315 [1] later. Thanks again.
>
> [1]:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL
>
> Best,
> Ron
>
> liu ron  于2023年6月5日周一 16:01写道:
>
> > Hi, Yun, Jinsong, Benchao
> >
> > Thanks for your valuable input about this FLIP.
> >
> > First of all, let me emphasize that from the technical implementation
> > point of view, this design is feasible in both stream and batch
> scenarios,
> > so I consider both stream and batch mode in FLIP. In the stream scenario,
> > for stateful operator, according to our business experience, basically
> the
> > bottleneck is on the state access, so the optimization effect of OFCG for
> > the stream will not be particularly obvious, so we will not give priority
> > to support it currently. On the contrary, in the batch scenario, where
> CPU
> > is the bottleneck, this optimization is gainful.
> >
> > Taking the above into account, we are able to support both stream and
> > batch mode optimization in this design, but we will give priority to
> > supporting batch operators. As benchao said, when we find a suitable
> > streaming business scenario in the future, we can consider doing this
> > optimization. Back to Yun issue, the design will break state
> compatibility
> > in stream mode as[1] and the version upgrade will not support this OFCG.
> As
> > mentioned earlier, we will not support this feature in stream mode in the
> > short term.
> >
> > Also thanks to Benchao's suggestion, I will state the current goal of
> that
> > optimization in the FLIP, scoped to batch mode.
> >
> > Best,
> > Ron
> >
> > liu ron  于2023年6月5日周一 15:04写道:
> >
> >> Hi, Lincoln
> >>
> >> Thanks for your appreciation of this design. Regarding your question:
> >>
> >> > do we consider adding a benchmark for the operators to intuitively
> >> understand the improvement brought by each improvement?
> >>
> >> I think it makes sense to add a benchmark, Spark also has this benchmark
> >> framework. But I think it is another story to introduce a benchmark
> >> framework in Flink, we need to start a new discussion to this work.
> >>
> >> > for the implementation plan, mentioned in the FLIP that 1.18 will
> >> support Calc, HashJoin and HashAgg, then what will be the next step? and
> >> which operators do we ultimately expect to cover (all or specific ones)?
> >>
> >> Our ultimate goal is to support all operators in batch mode, but we
> >> prioritize them according to their usage. Operators like Calc, HashJoin,
> >> HashAgg, etc. are more commonly used, so we will support them first.
> Later
> >> we support the rest of the operators step by step. Considering the time
> >> factor and the development workload, so we can only support  Calc,
> >> HashJoin, HashAgg in 1.18. In 1.19 or 1.20, we will complete the rest
> work.
> >> I will make this clear in FLIP
> >>
> >> Best,
> >> Ron
> >>
> >> Jingsong Li  于2023年6月5日周一 14:15写道:
> >>
> >>> > For the state compatibility session, it seems that the checkpoint
> >>> compatibility would be broken just like [1] did. Could FLIP-190 [2]
> still
> >>> be helpful in this case for SQL version upgrades?
> >>>
> >>> I guess this is only for batch processing. Streaming should be another
> >>> story?
> >>>
> >>> Best,
> >>> Jingsong
> >>>
> >>> On Mon, Jun 5, 2023 at 2:07 PM Yun Tang  wrote:
> >>> >
> >>> > Hi Ron,
> >>> >
> >>> > I think this FLIP would help to improve the performance, looking
> >>> forward to its completion in Flink!
> >>> >
> >>> > For the state compatibility session, it seems that the checkpoint
> >>> compatibility would be broken just like [1] did. Could FLIP-190 [2]
> still
> >>> be helpful in this case for SQL version upgrades?
> >>> >
> >>> >
> >>> > [1]
> >>>
> https://docs

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-05 Thread Aitozi
One more thing for discussion:

In our internal implementation, we reuse the option
`table.exec.async-lookup.buffer-capacity` and
`table.exec.async-lookup.timeout` to config
the async udtf. Do you think we should add two extra option to distinguish
from the lookup option such as

`table.exec.async-udtf.buffer-capacity`
`table.exec.async-udtf.timeout`


Best,
Aitozi.



Aitozi  于2023年6月5日周一 12:20写道:

> Hi Jing,
>
> > what is the difference between the RPC call or query you mentioned
> and the lookup in a very
> general way
>
> I think the RPC call or query service is quite similar to the lookup join.
> But lookup join should work
> with `LookupTableSource`.
>
> Let's see how we can perform an async RPC call with lookup join:
>
> (1) Implement an AsyncTableFunction with RPC call logic.
> (2) Implement a `LookupTableSource` connector run with the async udtf
> defined in (1).
> (3) Then define a DDL of this look up table in SQL
>
> CREATE TEMPORARY TABLE Customers (
>   id INT,
>   name STRING,
>   country STRING,
>   zip STRING
> ) WITH (
>   'connector' = 'custom'
> );
>
> (4) Run with the query as below:
>
> SELECT o.order_id, o.total, c.country, c.zip
> FROM Orders AS o
>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
> ON o.customer_id = c.id;
>
> This example is from doc
> <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join>.You
> can image the look up process as an async RPC call process.
>
> Let's see how we can perform an async RPC call with lateral join:
>
> (1) Implement an AsyncTableFunction with RPC call logic.
> (2) Run query with
>
> Create function f1 as '...' ;
>
> SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral table
> (f1(order_id)) as T(...);
>
> As you can see, the lateral join version is more simple and intuitive to
> users. Users do not have to wrap a
> LookupTableSource for the purpose of using async udtf.
>
> In the end, We can also see the user defined async table function is an
> enhancement of the current lateral table join
> which only supports sync lateral join now.
>
> Best,
> Aitozi.
>
>
> Jing Ge  于2023年6月2日周五 19:37写道:
>
>> Hi Aitozi,
>>
>> Thanks for the update. Just out of curiosity, what is the difference
>> between the RPC call or query you mentioned and the lookup in a very
>> general way? Since Lateral join is used in the FLIP. Is there any special
>> thought for that? Sorry for asking so many questions. The FLIP contains
>> limited information to understand the motivation.
>>
>> Best regards,
>> Jing
>>
>> On Fri, Jun 2, 2023 at 3:48 AM Aitozi  wrote:
>>
>> > Hi Jing,
>> > I have updated the proposed changes to the FLIP. IMO, lookup has its
>> > clear
>> > async call requirement is due to its IO heavy operator. In our usage,
>> sql
>> > users have
>> > logic to do some RPC call or query the third-party service which is
>> also IO
>> > intensive.
>> > In these case, we'd like to leverage the async function to improve the
>> > throughput.
>> >
>> > Thanks,
>> > Aitozi.
>> >
>> > Jing Ge  于2023年6月1日周四 22:55写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > Sorry for the late reply. Would you like to update the proposed
>> changes
>> > > with more details into the FLIP too?
>> > > I got your point. It looks like a rational idea. However, since lookup
>> > has
>> > > its clear async call requirement, are there any real use cases that
>> > > need this change? This will help us understand the motivation. After
>> all,
>> > > lateral join and temporal lookup join[1] are quite different.
>> > >
>> > > Best regards,
>> > > Jing
>> > >
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
>> > >
>> > > On Wed, May 31, 2023 at 8:53 AM Aitozi  wrote:
>> > >
>> > > > Hi Jing,
>> > > > What do you think about it? Can we move forward this feature?
>> > > >
>> > > > Thanks,
>> > > > Aitozi.
>> > > >
>> > > > Aitozi  于2023年5月29日周一 09:56写道:
>> > > >
>> > > > > Hi Jing,
>> >

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-04 Thread Aitozi
Hi Jing,

> what is the difference between the RPC call or query you mentioned
and the lookup in a very
general way

I think the RPC call or query service is quite similar to the lookup join.
But lookup join should work
with `LookupTableSource`.

Let's see how we can perform an async RPC call with lookup join:

(1) Implement an AsyncTableFunction with RPC call logic.
(2) Implement a `LookupTableSource` connector run with the async udtf
defined in (1).
(3) Then define a DDL of this look up table in SQL

CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'custom'
);

(4) Run with the query as below:

SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

This example is from doc
<https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join>.You
can image the look up process as an async RPC call process.

Let's see how we can perform an async RPC call with lateral join:

(1) Implement an AsyncTableFunction with RPC call logic.
(2) Run query with

Create function f1 as '...' ;

SELECT o.order_id, o.total, c.country, c.zip FROM Orders  lateral table
(f1(order_id)) as T(...);

As you can see, the lateral join version is more simple and intuitive to
users. Users do not have to wrap a
LookupTableSource for the purpose of using async udtf.

In the end, We can also see the user defined async table function is an
enhancement of the current lateral table join
which only supports sync lateral join now.

Best,
Aitozi.


Jing Ge  于2023年6月2日周五 19:37写道:

> Hi Aitozi,
>
> Thanks for the update. Just out of curiosity, what is the difference
> between the RPC call or query you mentioned and the lookup in a very
> general way? Since Lateral join is used in the FLIP. Is there any special
> thought for that? Sorry for asking so many questions. The FLIP contains
> limited information to understand the motivation.
>
> Best regards,
> Jing
>
> On Fri, Jun 2, 2023 at 3:48 AM Aitozi  wrote:
>
> > Hi Jing,
> > I have updated the proposed changes to the FLIP. IMO, lookup has its
> > clear
> > async call requirement is due to its IO heavy operator. In our usage, sql
> > users have
> > logic to do some RPC call or query the third-party service which is also
> IO
> > intensive.
> > In these case, we'd like to leverage the async function to improve the
> > throughput.
> >
> > Thanks,
> > Aitozi.
> >
> > Jing Ge  于2023年6月1日周四 22:55写道:
> >
> > > Hi Aitozi,
> > >
> > > Sorry for the late reply. Would you like to update the proposed changes
> > > with more details into the FLIP too?
> > > I got your point. It looks like a rational idea. However, since lookup
> > has
> > > its clear async call requirement, are there any real use cases that
> > > need this change? This will help us understand the motivation. After
> all,
> > > lateral join and temporal lookup join[1] are quite different.
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
> > >
> > > On Wed, May 31, 2023 at 8:53 AM Aitozi  wrote:
> > >
> > > > Hi Jing,
> > > > What do you think about it? Can we move forward this feature?
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > > Aitozi  于2023年5月29日周一 09:56写道:
> > > >
> > > > > Hi Jing,
> > > > > > "Do you mean to support the AyncTableFunction beyond the
> > > > > LookupTableSource?"
> > > > > Yes, I mean to support the AyncTableFunction beyond the
> > > > LookupTableSource.
> > > > >
> > > > > The "AsyncTableFunction" is the function with ability to be
> executed
> > > > async
> > > > > (with AsyncWaitOperator).
> > > > > The async lookup join is a one of usage of this. So, we don't have
> to
> > > > bind
> > > > > the AyncTableFunction with LookupTableSource.
> > > > > If User-defined AsyncTableFunction is supported, user can directly
> > use
> > > > > lateral table syntax to perform async operation.
> > > > >
> > > > > > "It would be better if you could elaborate the propos

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-01 Thread Aitozi
Hi Jing,
I have updated the proposed changes to the FLIP. IMO, lookup has its
clear
async call requirement is due to its IO heavy operator. In our usage, sql
users have
logic to do some RPC call or query the third-party service which is also IO
intensive.
In these case, we'd like to leverage the async function to improve the
throughput.

Thanks,
Aitozi.

Jing Ge  于2023年6月1日周四 22:55写道:

> Hi Aitozi,
>
> Sorry for the late reply. Would you like to update the proposed changes
> with more details into the FLIP too?
> I got your point. It looks like a rational idea. However, since lookup has
> its clear async call requirement, are there any real use cases that
> need this change? This will help us understand the motivation. After all,
> lateral join and temporal lookup join[1] are quite different.
>
> Best regards,
> Jing
>
>
> [1]
>
> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54
>
> On Wed, May 31, 2023 at 8:53 AM Aitozi  wrote:
>
> > Hi Jing,
> > What do you think about it? Can we move forward this feature?
> >
> > Thanks,
> > Aitozi.
> >
> > Aitozi  于2023年5月29日周一 09:56写道:
> >
> > > Hi Jing,
> > > > "Do you mean to support the AyncTableFunction beyond the
> > > LookupTableSource?"
> > > Yes, I mean to support the AyncTableFunction beyond the
> > LookupTableSource.
> > >
> > > The "AsyncTableFunction" is the function with ability to be executed
> > async
> > > (with AsyncWaitOperator).
> > > The async lookup join is a one of usage of this. So, we don't have to
> > bind
> > > the AyncTableFunction with LookupTableSource.
> > > If User-defined AsyncTableFunction is supported, user can directly use
> > > lateral table syntax to perform async operation.
> > >
> > > > "It would be better if you could elaborate the proposed changes wrt
> the
> > > CorrelatedCodeGenerator with more details"
> > >
> > > In the proposal, we use lateral table syntax to support the async table
> > > function. So the planner will also treat this statement to a
> > > CommonExecCorrelate node. So the runtime code should be generated in
> > > CorrelatedCodeGenerator.
> > > In CorrelatedCodeGenerator, we will know the TableFunction's Kind of
> > > `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
> > > For  `FunctionKind.ASYNC_TABLE` we can generate a AsyncWaitOperator to
> > > execute the async table function.
> > >
> > >
> > > Thanks,
> > > Aitozi.
> > >
> > >
> > > Jing Ge  于2023年5月29日周一 03:22写道:
> > >
> > >> Hi Aitozi,
> > >>
> > >> Thanks for the clarification. The naming "Lookup" might suggest using
> it
> > >> for table look up. But conceptually what the eval() method will do is
> to
> > >> get a collection of results(Row, RowData) from the given keys. How it
> > will
> > >> be done depends on the implementation, i.e. you can implement your own
> > >> Source[1][2]. The example in the FLIP should be able to be handled in
> > this
> > >> way.
> > >>
> > >> Do you mean to support the AyncTableFunction beyond the
> > LookupTableSource?
> > >> It would be better if you could elaborate the proposed changes wrt the
> > >> CorrelatedCodeGenerator with more details. Thanks!
> > >>
> > >> Best regards,
> > >> Jing
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
> > >> [2]
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
> > >>
> > >> On Sat, May 27, 2023 at 9:48 AM Aitozi  wrote:
> > >>
> > >> > Hi Jing,
> > >> > Thanks for your response. As stated in the FLIP, the purpose of
> > this
> > >> > FLIP is meant to support
> > >> > user-defined async table function. As described in flink document
> [1]
> > >> >
> > >> > Async table functions are spe

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-30 Thread Aitozi
Hi Jing,
What do you think about it? Can we move forward this feature?

Thanks,
Aitozi.

Aitozi  于2023年5月29日周一 09:56写道:

> Hi Jing,
> > "Do you mean to support the AyncTableFunction beyond the
> LookupTableSource?"
> Yes, I mean to support the AyncTableFunction beyond the LookupTableSource.
>
> The "AsyncTableFunction" is the function with ability to be executed async
> (with AsyncWaitOperator).
> The async lookup join is a one of usage of this. So, we don't have to bind
> the AyncTableFunction with LookupTableSource.
> If User-defined AsyncTableFunction is supported, user can directly use
> lateral table syntax to perform async operation.
>
> > "It would be better if you could elaborate the proposed changes wrt the
> CorrelatedCodeGenerator with more details"
>
> In the proposal, we use lateral table syntax to support the async table
> function. So the planner will also treat this statement to a
> CommonExecCorrelate node. So the runtime code should be generated in
> CorrelatedCodeGenerator.
> In CorrelatedCodeGenerator, we will know the TableFunction's Kind of
> `FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
> For  `FunctionKind.ASYNC_TABLE` we can generate a AsyncWaitOperator to
> execute the async table function.
>
>
> Thanks,
> Aitozi.
>
>
> Jing Ge  于2023年5月29日周一 03:22写道:
>
>> Hi Aitozi,
>>
>> Thanks for the clarification. The naming "Lookup" might suggest using it
>> for table look up. But conceptually what the eval() method will do is to
>> get a collection of results(Row, RowData) from the given keys. How it will
>> be done depends on the implementation, i.e. you can implement your own
>> Source[1][2]. The example in the FLIP should be able to be handled in this
>> way.
>>
>> Do you mean to support the AyncTableFunction beyond the LookupTableSource?
>> It would be better if you could elaborate the proposed changes wrt the
>> CorrelatedCodeGenerator with more details. Thanks!
>>
>> Best regards,
>> Jing
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
>> [2]
>>
>> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
>>
>> On Sat, May 27, 2023 at 9:48 AM Aitozi  wrote:
>>
>> > Hi Jing,
>> > Thanks for your response. As stated in the FLIP, the purpose of this
>> > FLIP is meant to support
>> > user-defined async table function. As described in flink document [1]
>> >
>> > Async table functions are special functions for table sources that
>> perform
>> > > a lookup.
>> > >
>> >
>> > So end user can not directly define and use async table function now. An
>> > user case is reported in [2]
>> >
>> > So, in conclusion, no new interface is introduced, but we extend the
>> > ability to support user-defined async table function.
>> >
>> > [1]:
>> >
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
>> > [2]: https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
>> >
>> > Thanks.
>> > Aitozi.
>> >
>> >
>> > Jing Ge  于2023年5月27日周六 06:40写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > Thanks for your proposal. I am not quite sure if I understood your
>> > thoughts
>> > > correctly. You described a special case implementation of the
>> > > AsyncTableFunction with on public API changes. Would you please
>> elaborate
>> > > your purpose of writing a FLIP according to the FLIP documentation[1]?
>> > > Thanks!
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>> > >
>> > > Best regards,
>> > > Jing
>> > >
>> > > On Wed, May 24, 2023 at 1:07 PM Aitozi  wrote:
>> > >
>> > > > May I ask for some feedback  :D
>> > > >
>> > > > Thanks,
>> > > > Aitozi
>> > > >
>> > > > Aitozi  于2023年5月23日周二 19:14写道:
>> > > > >
>> > > > > Just catch an user case report from Giann

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-28 Thread Aitozi
Hi Jing,
> "Do you mean to support the AyncTableFunction beyond the
LookupTableSource?"
Yes, I mean to support the AyncTableFunction beyond the LookupTableSource.

The "AsyncTableFunction" is the function with ability to be executed async
(with AsyncWaitOperator).
The async lookup join is a one of usage of this. So, we don't have to bind
the AyncTableFunction with LookupTableSource.
If User-defined AsyncTableFunction is supported, user can directly use
lateral table syntax to perform async operation.

> "It would be better if you could elaborate the proposed changes wrt the
CorrelatedCodeGenerator with more details"

In the proposal, we use lateral table syntax to support the async table
function. So the planner will also treat this statement to a
CommonExecCorrelate node. So the runtime code should be generated in
CorrelatedCodeGenerator.
In CorrelatedCodeGenerator, we will know the TableFunction's Kind of
`FunctionKind.Table` or `FunctionKind.ASYNC_TABLE`
For  `FunctionKind.ASYNC_TABLE` we can generate a AsyncWaitOperator to
execute the async table function.


Thanks,
Aitozi.


Jing Ge  于2023年5月29日周一 03:22写道:

> Hi Aitozi,
>
> Thanks for the clarification. The naming "Lookup" might suggest using it
> for table look up. But conceptually what the eval() method will do is to
> get a collection of results(Row, RowData) from the given keys. How it will
> be done depends on the implementation, i.e. you can implement your own
> Source[1][2]. The example in the FLIP should be able to be handled in this
> way.
>
> Do you mean to support the AyncTableFunction beyond the LookupTableSource?
> It would be better if you could elaborate the proposed changes wrt the
> CorrelatedCodeGenerator with more details. Thanks!
>
> Best regards,
> Jing
>
> [1]
>
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64
> [2]
>
> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49
>
> On Sat, May 27, 2023 at 9:48 AM Aitozi  wrote:
>
> > Hi Jing,
> > Thanks for your response. As stated in the FLIP, the purpose of this
> > FLIP is meant to support
> > user-defined async table function. As described in flink document [1]
> >
> > Async table functions are special functions for table sources that
> perform
> > > a lookup.
> > >
> >
> > So end user can not directly define and use async table function now. An
> > user case is reported in [2]
> >
> > So, in conclusion, no new interface is introduced, but we extend the
> > ability to support user-defined async table function.
> >
> > [1]:
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
> > [2]: https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> >
> > Thanks.
> > Aitozi.
> >
> >
> > Jing Ge  于2023年5月27日周六 06:40写道:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for your proposal. I am not quite sure if I understood your
> > thoughts
> > > correctly. You described a special case implementation of the
> > > AsyncTableFunction with on public API changes. Would you please
> elaborate
> > > your purpose of writing a FLIP according to the FLIP documentation[1]?
> > > Thanks!
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Wed, May 24, 2023 at 1:07 PM Aitozi  wrote:
> > >
> > > > May I ask for some feedback  :D
> > > >
> > > > Thanks,
> > > > Aitozi
> > > >
> > > > Aitozi  于2023年5月23日周二 19:14写道:
> > > > >
> > > > > Just catch an user case report from Giannis Polyzos for this usage:
> > > > >
> > > > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> > > > >
> > > > > Aitozi  于2023年5月23日周二 17:45写道:
> > > > > >
> > > > > > Hi guys,
> > > > > > I want to bring up a discussion about adding support of User
> > > > > > Defined AsyncTableFunction in Flink.
> > > > > > Currently, async table function are special functions for table
> > > source
> > > > > > to perform
> > > > > > async lookup. However, it's worth to support the user defined
> async
> > > > > > table function.
> > > > > > Because, in this way, the end SQL user can leverage it to perform
> > the
> > > > > > async operation
> > > > > > which is useful to maximum the system throughput especially for
> IO
> > > > > > bottleneck case.
> > > > > >
> > > > > > You can find some more detail in [1].
> > > > > >
> > > > > > Looking forward to feedback
> > > > > >
> > > > > >
> > > > > > [1]:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
> > > > > >
> > > > > > Thanks,
> > > > > > Aitozi.
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-27 Thread Aitozi
Hi Jing,
Thanks for your response. As stated in the FLIP, the purpose of this
FLIP is meant to support
user-defined async table function. As described in flink document [1]

Async table functions are special functions for table sources that perform
> a lookup.
>

So end user can not directly define and use async table function now. An
user case is reported in [2]

So, in conclusion, no new interface is introduced, but we extend the
ability to support user-defined async table function.

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/
[2]: https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b

Thanks.
Aitozi.


Jing Ge  于2023年5月27日周六 06:40写道:

> Hi Aitozi,
>
> Thanks for your proposal. I am not quite sure if I understood your thoughts
> correctly. You described a special case implementation of the
> AsyncTableFunction with on public API changes. Would you please elaborate
> your purpose of writing a FLIP according to the FLIP documentation[1]?
> Thanks!
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> Best regards,
> Jing
>
> On Wed, May 24, 2023 at 1:07 PM Aitozi  wrote:
>
> > May I ask for some feedback  :D
> >
> > Thanks,
> > Aitozi
> >
> > Aitozi  于2023年5月23日周二 19:14写道:
> > >
> > > Just catch an user case report from Giannis Polyzos for this usage:
> > >
> > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
> > >
> > > Aitozi  于2023年5月23日周二 17:45写道:
> > > >
> > > > Hi guys,
> > > > I want to bring up a discussion about adding support of User
> > > > Defined AsyncTableFunction in Flink.
> > > > Currently, async table function are special functions for table
> source
> > > > to perform
> > > > async lookup. However, it's worth to support the user defined async
> > > > table function.
> > > > Because, in this way, the end SQL user can leverage it to perform the
> > > > async operation
> > > > which is useful to maximum the system throughput especially for IO
> > > > bottleneck case.
> > > >
> > > > You can find some more detail in [1].
> > > >
> > > > Looking forward to feedback
> > > >
> > > >
> > > > [1]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
> > > >
> > > > Thanks,
> > > > Aitozi.
> >
>


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-24 Thread Aitozi
May I ask for some feedback  :D

Thanks,
Aitozi

Aitozi  于2023年5月23日周二 19:14写道:
>
> Just catch an user case report from Giannis Polyzos for this usage:
>
> https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b
>
> Aitozi  于2023年5月23日周二 17:45写道:
> >
> > Hi guys,
> > I want to bring up a discussion about adding support of User
> > Defined AsyncTableFunction in Flink.
> > Currently, async table function are special functions for table source
> > to perform
> > async lookup. However, it's worth to support the user defined async
> > table function.
> > Because, in this way, the end SQL user can leverage it to perform the
> > async operation
> > which is useful to maximum the system throughput especially for IO
> > bottleneck case.
> >
> > You can find some more detail in [1].
> >
> > Looking forward to feedback
> >
> >
> > [1]: 
> > https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
> >
> > Thanks,
> > Aitozi.


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-23 Thread Aitozi
Just catch an user case report from Giannis Polyzos for this usage:

https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b

Aitozi  于2023年5月23日周二 17:45写道:
>
> Hi guys,
> I want to bring up a discussion about adding support of User
> Defined AsyncTableFunction in Flink.
> Currently, async table function are special functions for table source
> to perform
> async lookup. However, it's worth to support the user defined async
> table function.
> Because, in this way, the end SQL user can leverage it to perform the
> async operation
> which is useful to maximum the system throughput especially for IO
> bottleneck case.
>
> You can find some more detail in [1].
>
> Looking forward to feedback
>
>
> [1]: 
> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction
>
> Thanks,
> Aitozi.


[DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-05-23 Thread Aitozi
Hi guys,
I want to bring up a discussion about adding support of User
Defined AsyncTableFunction in Flink.
Currently, async table function are special functions for table source
to perform
async lookup. However, it's worth to support the user defined async
table function.
Because, in this way, the end SQL user can leverage it to perform the
async operation
which is useful to maximum the system throughput especially for IO
bottleneck case.

You can find some more detail in [1].

Looking forward to feedback


[1]: 
https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.


Re: Re: [VOTE] FLIP-302: Support TRUNCATE TABLE statement in batch mode

2023-04-17 Thread Aitozi
+1

Best,
Aitozi

ron  于2023年4月18日周二 09:18写道:
>
> +1
>
>
> > -原始邮件-
> > 发件人: "Lincoln Lee" 
> > 发送时间: 2023-04-18 09:08:08 (星期二)
> > 收件人: dev@flink.apache.org
> > 抄送:
> > 主题: Re: [VOTE] FLIP-302: Support TRUNCATE TABLE statement in batch mode
> >
> > +1 (binding)
> >
> > Best,
> > Lincoln Lee
> >
> >
> > yuxia  于2023年4月17日周一 23:54写道:
> >
> > > Hi all.
> > >
> > > Thanks for all the feedback on FLIP-302: Support TRUNCATE TABLE statement
> > > in batch mode [1].
> > > Based on the discussion [2], we have come to a consensus, so I would like
> > > to start a vote.
> > >
> > > The vote will last for at least 72 hours unless there is an objection or
> > > insufficient votes.
> > >
> > > [1]:
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement+in+batch+mode
> > > [2]: [ https://lists.apache.org/thread/m4r3wrd7p96wdst3nz3ncqzog6kf51cf |
> > > https://lists.apache.org/thread/m4r3wrd7p96wdst3nz3ncqzog6kf51cf ]
> > >
> > >
> > > Best regards,
> > > Yuxia
> > >
>
>
> --
> Best,
> Ron


Re: [DISCUSS] EncodingFormat and DecondingFormat provide copy API

2023-04-14 Thread Aitozi
Hi tanjialiang
Thanks for reporting this, I think it's a reasonable requirements.
The problem might be introduced during the optimization when
reusing the mutable state in Source. So the DecodingFormat#copy can
avoid this situation.

But after checking the related code for DynamicTableSink, It's a
little different, I think it do not rely on the copy interface now,
even the DynamicTableSink#copy is not invoked by the framework. I
guess the reason is that the optimization for sink are all static
abilities
spec which are all done before the optimization. So no copy is happened.

Anyway, it's unclear for the implementer to know which stage will the
`apply` happen, So +1 for introducing the copy for EncodingFormat and
DecodingFormat to align the supporting of deep copy semantic this time.

Best,
Aitozi.

tanjialiang  于2023年4月13日周四 10:05写道:
>
> Hi, devs.
>
>
> I'd like to start a discussion about to EncodingFormat and DecondingFormat 
> provide copy API, which relate to FLINK-31686 [1].
>
>
> Current, DecodingFormat doesn't support copy(), which makes the 
> DecodingFormat resued after filter/projection is pushed down. The 
> EncodingFormat has the same problem if class implements 
> EncodingFormat#applyWritableMetadata(). So I think EncodingFormat and 
> DecodingFormat need to provide a copy function, and it should be a deep copy 
> if format implements 
> DecodingFormat#applyReadableMetadata/EncodingFormat#applyWritableMetadata/BulkDecodingFormat#applyFilters.
>
>
>
> Looking forwards to your feedback.
>
>
> [1]: [https://issues.apache.org/jira/browse/FLINK-31686]
>
>
> Best regards,
> tanjialiang
>
>
>


Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-04-13 Thread Aitozi
Well, thanks xia for your clarification. Agree with your point, I have
no other concerns.

Best,
Aitozi.

yuxia  于2023年4月13日周四 16:17写道:
>
> Hi, Aitozi.
> Thanks for your inputs. I understand your concern. Althogh the external 
> connector can update the metadata in method `executeTruncation`,
> but the Flink catalog can't be aware the updating in some case. If the Hive 
> catalog only store hive tables, everything will be fine.
> But if the Hive catalog also store non-hive table, and the non-hive table 
> can't be update the underlying Hive metatasore, as a result of which
> the Hive catalog will still get old metata.
>
> Since this problem is generic which is not only limited to truncate table 
> statment, but also to other statement, like insert, update/delete or other 
> statments on the way.
> I think it deserves another dedicated channel to discuss what the Flink 
> catalog is for or do we need to introduce some new mechanism for it.
>
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Aitozi" 
> 收件人: "dev" 
> 发送时间: 星期四, 2023年 4 月 13日 下午 2:37:48
> 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
>
> Hi, xia
>> which I think if Flink supports table cache in framework-level,
> we can also recache in framework-level for truncate table statement.
>
> I think currently flink catalog already will some stats for the table,
> eg: after `ANALYZE TABLE`, the table's Statistics will be stored in
> the
> catalog, but truncate table will not correct the statistic.
>
> I know it's hard for Flink to do the unified follow-up actions after
> truncating table. But I think we need define a clear location for the
> Flink Catalog
> in mind.
> IMO, Flink as a compute engine, it's hard for it to maintain the
> catalog for different storage table itself. So with more and more
> `Executable`
> command introduced the data in catalog will be cleaved.
> In this case, after truncate the catalog's following part may be affected:
>
> - the table/column statistic will be not correct
> - the partition of this table should be cleared
>
>
> Best,
> Aitozi.
>
>
> liu ron  于2023年4月13日周四 11:28写道:
>
> >
> > Hi, xia
> >
> > Thanks for your explanation, for the first question, given the current
> > status, I think we can provide the generic interface in the future if we
> > need it. For the second question,  it makes sense to me if we can
> > support the table cache at the framework level.
> >
> > Best,
> > Ron
> >
> > yuxia  于2023年4月11日周二 16:12写道:
> >
> > > Hi, ron.
> > >
> > > 1: Considering for deleting rows, Flink will also write delete record to
> > > achive purpose of deleting data, it may not as so strange for connector
> > > devs to make DynamicTableSink implement SupportsTruncate to support
> > > truncate the table. Based on the assume that DynamicTableSink is used for
> > > inserting/updating/deleting, I think it's reasonable for DynamicTableSink
> > > to implement SupportsTruncate. But I think it sounds reasonable to add a
> > > generic interface like DynamicTable to differentiate DynamicTableSource &
> > > DynamicTableSink. But it will definitely requires much design and
> > > discussion which deserves a dedicated FLIP. I perfer not to do that in 
> > > this
> > > FLIP to avoid overdesign and I think it's not a must for this FLIP. Maybe
> > > we can discuss it if some day if we do need the new generic table 
> > > interface.
> > >
> > > 2: Considering various catalogs and tables, it's hard for Flink to do the
> > > unified follow-up actions after truncating table. But still the external
> > > connector can do such follow-up actions in method `executeTruncation`.
> > > Btw, in Spark, for the newly truncate table interface[1], Spark only
> > > recaches the table after truncating table[2] which I think if Flink
> > > supports table cache in framework-level,
> > > we can also recache in framework-level for truncate table statement.
> > >
> > > [1]
> > > https://github.com/apache/spark/blob/1a42aa5bd44e7524bb55463bbd85bea782715834/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java
> > > [2]
> > > https://github.com/apache/spark/blob/06c09a79b371c5ac3e4ebad1118ed94b460f48d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
> > >
> > >
> > > I think the external catalog can implemnet such logic in method
> > > `executeTruncation`.
> &

Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-04-12 Thread Aitozi
Hi, xia
   > which I think if Flink supports table cache in framework-level,
we can also recache in framework-level for truncate table statement.

I think currently flink catalog already will some stats for the table,
eg: after `ANALYZE TABLE`, the table's Statistics will be stored in
the
catalog, but truncate table will not correct the statistic.

I know it's hard for Flink to do the unified follow-up actions after
truncating table. But I think we need define a clear location for the
Flink Catalog
in mind.
IMO, Flink as a compute engine, it's hard for it to maintain the
catalog for different storage table itself. So with more and more
`Executable`
command introduced the data in catalog will be cleaved.
In this case, after truncate the catalog's following part may be affected:

- the table/column statistic will be not correct
- the partition of this table should be cleared


Best,
Aitozi.


liu ron  于2023年4月13日周四 11:28写道:

>
> Hi, xia
>
> Thanks for your explanation, for the first question, given the current
> status, I think we can provide the generic interface in the future if we
> need it. For the second question,  it makes sense to me if we can
> support the table cache at the framework level.
>
> Best,
> Ron
>
> yuxia  于2023年4月11日周二 16:12写道:
>
> > Hi, ron.
> >
> > 1: Considering for deleting rows, Flink will also write delete record to
> > achive purpose of deleting data, it may not as so strange for connector
> > devs to make DynamicTableSink implement SupportsTruncate to support
> > truncate the table. Based on the assume that DynamicTableSink is used for
> > inserting/updating/deleting, I think it's reasonable for DynamicTableSink
> > to implement SupportsTruncate. But I think it sounds reasonable to add a
> > generic interface like DynamicTable to differentiate DynamicTableSource &
> > DynamicTableSink. But it will definitely requires much design and
> > discussion which deserves a dedicated FLIP. I perfer not to do that in this
> > FLIP to avoid overdesign and I think it's not a must for this FLIP. Maybe
> > we can discuss it if some day if we do need the new generic table interface.
> >
> > 2: Considering various catalogs and tables, it's hard for Flink to do the
> > unified follow-up actions after truncating table. But still the external
> > connector can do such follow-up actions in method `executeTruncation`.
> > Btw, in Spark, for the newly truncate table interface[1], Spark only
> > recaches the table after truncating table[2] which I think if Flink
> > supports table cache in framework-level,
> > we can also recache in framework-level for truncate table statement.
> >
> > [1]
> > https://github.com/apache/spark/blob/1a42aa5bd44e7524bb55463bbd85bea782715834/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TruncatableTable.java
> > [2]
> > https://github.com/apache/spark/blob/06c09a79b371c5ac3e4ebad1118ed94b460f48d1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
> >
> >
> > I think the external catalog can implemnet such logic in method
> > `executeTruncation`.
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "liu ron" 
> > 收件人: "dev" 
> > 发送时间: 星期二, 2023年 4 月 11日 上午 10:51:36
> > 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
> >
> > Hi, xia
> > It's a nice improvement to support TRUNCATE TABLE statement, making Flink
> > more feature-rich.
> > I think the truncate syntax is a command that will be executed in the
> > client's process, rather than pulling up a Flink job to execute on the
> > cluster. So on the user-facing exposed interface, I think we should not let
> > users implement the SupportsTruncate interface on the DynamicTableSink
> > interface. This seems a bit strange and also confuses users, as hang said,
> > why Source table does not support truncate. It would be nice if we could
> > come up with a generic interface that supports truncate instead of binding
> > it to the DynamicTableSink interface, and maybe in the future we will
> > support more commands like truncate command.
> >
> > In addition, after truncating data, we may also need to update the metadata
> > of the table, such as Hive table, we need to update the statistics, as well
> > as clear the cache in the metastore, I think we should also consider these
> > capabilities, Sparky has considered these, refer to
> >
> > https://github.com/apache/spark/blob/69dd20b5e45c7e3533efbfdc1974f59931c1b781/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Re: [DISCUSS] Add support for Apache Arrow format

2023-04-12 Thread Aitozi
> Which connectors would be commonly used when reading in Arrow format?
Filesystem?

Currently, yes. The better way is it can be combined used with
different connector,
but I have not figured out how to integrate the Arrow format
deserializer with the
`DecodingFormat` or `DeserializationSchema` interface. So, as a first
step, I want to introduce
it as the file bulk format.

Martijn Visser  于2023年4月12日周三 22:53写道:

>
> Which connectors would be commonly used when reading in Arrow format?
> Filesystem?
>
> On Wed, Apr 12, 2023 at 4:27 AM Jacky Lau  wrote:
>
> > Hi
> >I also think arrow format  will be useful when reading/writing with
> > message queue.
> >Arrow defines a language-independent columnar memory format for flat and
> > hierarchical data, organized for efficient analytic operations on modern
> > hardware like CPUs and GPUs. The Arrow memory format also supports
> > zero-copy reads for lightning-fast data access without serialization
> > overhead. it will bring a lot.
> >And we  may do some surveys, what other engines support like
> > spark/hive/presto and so on, how that supports and how it be used.
> >
> >Best,
> >Jacky.
> >
> > Aitozi  于2023年4月2日周日 22:22写道:
> >
> > > Hi all,
> > > Thanks for your input.
> > >
> > > @Ran > However, as mentioned in the issue you listed, it may take a lot
> > of
> > > work
> > > and the community's consideration for integrating Arrow.
> > >
> > > To clarify, this proposal solely aims to introduce flink-arrow as a new
> > > format,
> > > similar to flink-csv and flink-protobuf. It will not impact the internal
> > > data
> > > structure representation in Flink. For proof of concept, please refer to:
> > > https://github.com/Aitozi/flink/commits/arrow-format.
> > >
> > > @Martijn > I'm wondering if there's really much benefit for the Flink
> > > project to
> > > add another file format, over properly supporting the format that we
> > > already
> > > have in the project.
> > >
> > > Maintain the format we already have and introduce new formats should be
> > > orthogonal. The requirement of supporting arrow format originally
> > observed
> > > in
> > > our internal usage to deserialize the data(VectorSchemaRoot) from other
> > > storage
> > > systems to flink internal RowData and serialize the flink internal
> > RowData
> > > to
> > > VectorSchemaRoot out to the storage system.  And the requirement from the
> > > slack[1] is to support the arrow file format. Although, Arrow is not
> > > usually
> > > used as the final disk storage format.  But it has a tendency to be used
> > > as the
> > > inter-exchange format between different systems or temporary storage for
> > > analysis due to its columnar format and can be memory mapped to other
> > > analysis
> > > programs.
> > >
> > > So, I think it's meaningful to support arrow formats in Flink.
> > >
> > > @Jim >  If the Flink format interface is used there, then it may be
> > useful
> > > to
> > > consider Arrow along with other columnar formats.
> > >
> > > I am not well-versed with the formats utilized in Paimon. Upon checking
> > > [2], it
> > > appears that Paimon does not directly employ flink formats. Instead, it
> > > utilizes
> > > FormatWriterFactory and FormatReaderFactory to handle data serialization
> > > and
> > > deserialization. Therefore, I believe that the current work may not be
> > > applicable for reuse in Paimon at this time.
> > >
> > > Best,
> > > Aitozi.
> > >
> > > [1]:
> > https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629
> > > [2]:
> > >
> > https://github.com/apache/incubator-paimon/tree/master/paimon-format/src/main/java/org/apache/paimon/format
> > >
> > > Jim Hughes  于2023年3月31日周五 00:36写道:
> > > >
> > > > Hi all,
> > > >
> > > > How do Flink formats relate to or interact with Paimon (formerly
> > > > Flink-Table-Store)?  If the Flink format interface is used there, then
> > it
> > > > may be useful to consider Arrow along with other columnar formats.
> > > >
> > > > Separately, from previous experience, I've seen the Arrow format be
> > > useful
> > > > as an output format for clients to read efficiently.  Arrow do

[jira] [Created] (FLINK-31790) Filesystem batch sink should also respect to the PartitionCommitPolicy

2023-04-12 Thread Aitozi (Jira)
Aitozi created FLINK-31790:
--

 Summary: Filesystem batch sink should also respect to the 
PartitionCommitPolicy
 Key: FLINK-31790
 URL: https://issues.apache.org/jira/browse/FLINK-31790
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Aitozi


Currently, the {{PartitionCommitPolicy}} only take effect in the streaming file 
sink and hive file sink. The filesystem sink in batch mode should also respect 
to the commit policy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31787) Add the explicit ROW constructor to the system function doc

2023-04-12 Thread Aitozi (Jira)
Aitozi created FLINK-31787:
--

 Summary: Add the explicit ROW constructor to the system function 
doc
 Key: FLINK-31787
 URL: https://issues.apache.org/jira/browse/FLINK-31787
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31774) Add document for delete and update statement

2023-04-11 Thread Aitozi (Jira)
Aitozi created FLINK-31774:
--

 Summary: Add document for delete and update statement
 Key: FLINK-31774
 URL: https://issues.apache.org/jira/browse/FLINK-31774
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi


I do not find the declaration about the usage of DELETE and UPDATE statement in 
the SQL section. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31755) ROW function can not work with RewriteIntersectAllRule

2023-04-07 Thread Aitozi (Jira)
Aitozi created FLINK-31755:
--

 Summary: ROW function can not work with RewriteIntersectAllRule
 Key: FLINK-31755
 URL: https://issues.apache.org/jira/browse/FLINK-31755
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Aitozi


Reproduce case:


{code:java}
create table row_sink (
  `b` ROW
) with (
  'connector' = 'values'
)

util.verifyRelPlanInsert(
"INSERT INTO row_sink " +
  "SELECT ROW(a, b) FROM complex_type_src intersect all " +
  "SELECT ROW(c, d) FROM complex_type_src ")

{code}

It will fails with 


{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType(VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL EXPR$0) NOT NULL
Difference:
EXPR$0: RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
EXPR$0, INTEGER EXPR$1) NOT NULL -> RecordType(VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" EXPR$0, INTEGER EXPR$1) NOT NULL

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 68 more
{code}


The reason is:

ROW function will generates the {{FULLY_QUALIFIED}} type. But after the 
{{RewriteIntersectAllRule}} optimization, it will produce the 
{{PEEK_FIELDS_NO_EXPAND}}. So the volcan planner complains with type mismatch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Add support for Apache Arrow format

2023-04-02 Thread Aitozi
Hi all,
Thanks for your input.

@Ran > However, as mentioned in the issue you listed, it may take a lot of work
and the community's consideration for integrating Arrow.

To clarify, this proposal solely aims to introduce flink-arrow as a new format,
similar to flink-csv and flink-protobuf. It will not impact the internal data
structure representation in Flink. For proof of concept, please refer to:
https://github.com/Aitozi/flink/commits/arrow-format.

@Martijn > I'm wondering if there's really much benefit for the Flink project to
add another file format, over properly supporting the format that we already
have in the project.

Maintain the format we already have and introduce new formats should be
orthogonal. The requirement of supporting arrow format originally observed in
our internal usage to deserialize the data(VectorSchemaRoot) from other storage
systems to flink internal RowData and serialize the flink internal RowData to
VectorSchemaRoot out to the storage system.  And the requirement from the
slack[1] is to support the arrow file format. Although, Arrow is not usually
used as the final disk storage format.  But it has a tendency to be used as the
inter-exchange format between different systems or temporary storage for
analysis due to its columnar format and can be memory mapped to other analysis
programs.

So, I think it's meaningful to support arrow formats in Flink.

@Jim >  If the Flink format interface is used there, then it may be useful to
consider Arrow along with other columnar formats.

I am not well-versed with the formats utilized in Paimon. Upon checking [2], it
appears that Paimon does not directly employ flink formats. Instead, it utilizes
FormatWriterFactory and FormatReaderFactory to handle data serialization and
deserialization. Therefore, I believe that the current work may not be
applicable for reuse in Paimon at this time.

Best,
Aitozi.

[1]: https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629
[2]: 
https://github.com/apache/incubator-paimon/tree/master/paimon-format/src/main/java/org/apache/paimon/format

Jim Hughes  于2023年3月31日周五 00:36写道:
>
> Hi all,
>
> How do Flink formats relate to or interact with Paimon (formerly
> Flink-Table-Store)?  If the Flink format interface is used there, then it
> may be useful to consider Arrow along with other columnar formats.
>
> Separately, from previous experience, I've seen the Arrow format be useful
> as an output format for clients to read efficiently.  Arrow does support
> returning batches of records, so there may be some options to use the
> format in a streaming situation where a sufficient collection of records
> can be gathered.
>
> Cheers,
>
> Jim
>
>
>
> On Thu, Mar 30, 2023 at 8:32 AM Martijn Visser 
> wrote:
>
> > Hi,
> >
> > To be honest, I haven't seen that much demand for supporting the Arrow
> > format directly in Flink as a flink-format. I'm wondering if there's really
> > much benefit for the Flink project to add another file format, over
> > properly supporting the format that we already have in the project.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Thu, Mar 30, 2023 at 2:21 PM Ran Tao  wrote:
> >
> > > It is a good point that flink integrates apache arrow as a format.
> > > Arrow can take advantage of SIMD-specific or vectorized optimizations,
> > > which should be of great benefit to batch tasks.
> > > However, as mentioned in the issue you listed, it may take a lot of work
> > > and the community's consideration for integrating Arrow.
> > >
> > > I think you can try to make a simple poc for verification and some
> > specific
> > > plans.
> > >
> > >
> > > Best Regards,
> > > Ran Tao
> > >
> > >
> > > Aitozi  于2023年3月29日周三 19:12写道:
> > >
> > > > Hi guys
> > > >  I'm opening this thread to discuss supporting the Apache Arrow
> > > format
> > > > in Flink.
> > > >  Arrow is a language-independent columnar memory format that has
> > > become
> > > > widely used in different systems, and It can also serve as an
> > > > inter-exchange format between other systems.
> > > > So, using it directly in the Flink system will be nice. We also
> > received
> > > > some requests from slack[1][2] and jira[3].
> > > >  In our company's internal usage, we have used flink-python
> > moudle's
> > > > ArrowReader and ArrowWriter to support this work. But it still can not
> > > > integrate with the current flink-formats framework closely.
> > > > So, I'd like to introduce the flink-arrow formats module to support the
> > > > arrow format naturally.
> > > >  Looking forward to some suggestions.
> > > >
> > > >
> > > > Best,
> > > > Aitozi
> > > >
> > > >
> > > > [1]:
> > > https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629
> > > >
> > > > [2]:
> > > https://apache-flink.slack.com/archives/C03GV7L3G2C/p1666326443152789
> > > >
> > > > [3]: https://issues.apache.org/jira/browse/FLINK-10929
> > > >
> > >
> >


[DISCUSS] Add support for Apache Arrow format

2023-03-29 Thread Aitozi
Hi guys
 I'm opening this thread to discuss supporting the Apache Arrow format
in Flink.
 Arrow is a language-independent columnar memory format that has become
widely used in different systems, and It can also serve as an
inter-exchange format between other systems.
So, using it directly in the Flink system will be nice. We also received
some requests from slack[1][2] and jira[3].
 In our company's internal usage, we have used flink-python moudle's
ArrowReader and ArrowWriter to support this work. But it still can not
integrate with the current flink-formats framework closely.
So, I'd like to introduce the flink-arrow formats module to support the
arrow format naturally.
 Looking forward to some suggestions.


Best,
Aitozi


[1]: https://apache-flink.slack.com/archives/C03GV7L3G2C/p1677915016551629

[2]: https://apache-flink.slack.com/archives/C03GV7L3G2C/p1666326443152789

[3]: https://issues.apache.org/jira/browse/FLINK-10929


[jira] [Created] (FLINK-31497) Drop the deprecated CatalogViewImpl

2023-03-17 Thread Aitozi (Jira)
Aitozi created FLINK-31497:
--

 Summary: Drop the deprecated CatalogViewImpl 
 Key: FLINK-31497
 URL: https://issues.apache.org/jira/browse/FLINK-31497
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Reporter: Aitozi


After https://issues.apache.org/jira/browse/FLINK-29585

CatalogViewImpl not used in Flink project now, we may can drop it now cc 
[~snuyanzin]

But, we may have to check whether it is used in other connector's system



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSSI] Improve Multi-Query optimization in Flink

2023-03-15 Thread Aitozi
Hi devs,

I want to discuss about a new way to optimize the multi-sink query in Flink.

Currently, Flink use the RelNodeBlock mechanism to do optimization for the
mutl-sink query.

It has following steps:


   1. Multi-sink query will be parsed and validated to multi RelNode tree
   2. Merge the multi RelNode's common node into a single node if
   table.optimizer.reuse-optimize-block-with-digest-enabledis enabled
   3. Split the multi RelNode tree to multi RelNodeBlock.
   4. Feed the each RelNodeBlock to Calcite Planner to do the optimization
   5. Reconstruct back to the original structure with optimized RelNodeBlock

As far as I know (Please correct me if I'm wrong). The main purpose of
RelNodeBlock is doing the following two thing:


   - Calcite not support DAG optimization, So the RelNodeBlock can split
   the multi-tree to much single tree, then we can leverage calcite to do the
   optimization
   - In the Multi-sink query, we need to reduce the repeat calculation of
   the same node. So, if
   table.optimizer.reuse-optimize-block-with-digest-enabledis enabled, we
   can preserve the common node from being optimized to different results and
   lead to the repeat calculation

However, In our production, We found the ability of the RelNodeBlock
optimization is not enough. As shown in CommonSubGraphBasedOptimizer's
comments: The optimization of the RelNodeBlock is local optimization. There
are no optimization way between the RelNodeBlock. Take a simple example:

SinkSink

| |

Project(a,b)Project (a,b,c)

| |

Scan (a,b,c,d,e)  Scan (a,b,c,d,e)

It scan from the same table, In the current optimization, we can only
choose whether or not merge the Scan to a RelNodeBlock before optimization.

If merged, the Scan can not leverage the optimization of ProjectPushDown
and so on.

If not merged, during the optimization, two RelNodeBlock will generated two
different scan {a, b} and {a,b,c}.

So I'm proposing a new way to improve the CTE optimization of the
multi-query (or single query).


   1. Insert a VirtualMultiSink to pack the sink nodes together. described
   in [2]. Which is inspired by the [3]
   2. Insert a new Spool node (which is means produced once, consumed multi
   times) to the RelNode who has multi output.
   3. Implementing several rules around the Spool node


   1. PushProjectToSpool to pass away the unused the fields from all the
  Spool node's parents
  2. PushFilterToSpool to push down the DNF conditions of all the Spool
  node's parents
  3. ...


   1. Further more, we can implement the rule to discard the Spool, then
   let the Planner to decide whether to reuse or not based on the cost of each.
   2. After the physcial rewrite, we can remove the
   PhysicalVirtualMultiSink and the Spool node.

The benefits of the new way is:

   1. It can do the optimization in a single tree, So the local
   optimization can be avoid
   2. The cost based CTE optimization is available in the new solution.
   3. The new solution can optimize for the CTE in both multi-query and
   single-query, So the problem of [1] can also be resolved
   4. Avoid the trait propagate between the RelNodeBlocks in the current
   solution. as described in [4]

Looking forward to your inputs.

Best,

Aitozi.

[1]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-29088

[2]: https://issues.apache.org/jira/projects/FLINK/issues/FLINK-31205

[3]: Efficient and Extensible Algorithms for Multi Query Optimization
https://arxiv.org/pdf/cs/9910021.pdf

[4]: https://issues.apache.org/jira/browse/FLINK-24048


[jira] [Created] (FLINK-31426) Upgrade the deprecated UniqueConstraint to the new one

2023-03-13 Thread Aitozi (Jira)
Aitozi created FLINK-31426:
--

 Summary: Upgrade the deprecated UniqueConstraint to the new one 
 Key: FLINK-31426
 URL: https://issues.apache.org/jira/browse/FLINK-31426
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Hive, Table SQL / Planner
Reporter: Aitozi


https://github.com/apache/flink/pull/21522#discussion_r1133642525



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null overwrite problem of partial-insert

2023-03-13 Thread Aitozi
+1 (non-binding)

Best,
Aitozi

Jing Ge  于2023年3月13日周一 22:10写道:

> +1 (binding)
>
> Best Regards,
> Jing
>
> On Mon, Mar 13, 2023 at 1:57 PM Hang Ruan  wrote:
>
> > +1 (non-binding)
> >
> > Best,
> > Hang
> >
> > yuxia  于2023年3月13日周一 20:52写道:
> >
> > > +1 (binding)
> > > Thanks Lincoln Lee for driving it.
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Lincoln Lee" 
> > > 收件人: "dev" 
> > > 发送时间: 星期一, 2023年 3 月 13日 下午 8:17:52
> > > 主题: [VOTE] FLIP-300: Add targetColumns to DynamicTableSink#Context to
> > > solve the null overwrite problem of partial-insert
> > >
> > > Dear Flink developers,
> > >
> > > Thanks for all your feedback for FLIP-300: Add targetColumns to
> > > DynamicTableSink#Context to solve the null overwrite problem of
> > > partial-insert[1] on the discussion thread[2].
> > >
> > > I'd like to start a vote for it. The vote will be open for at least 72
> > > hours unless there is an objection or not enough votes.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081
> > > [2] https://lists.apache.org/thread/bk8x0nqg4oc62jqryj9ntzzlpj062wd9
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Yuxia Luo

2023-03-12 Thread Aitozi
Congratulations, Yuxia

Best,
Aitozi

Yuxin Tan  于2023年3月13日周一 10:36写道:

> Congratulations, Yuxia!
>
> Best,
> Yuxin
>
>
> Jark Wu  于2023年3月13日周一 10:26写道:
>
> > Hi, everyone
> >
> > On behalf of the PMC, I'm very happy to announce Yuxia Luo as a new Flink
> > Committer.
> >
> > Yuxia has been continuously contributing to the Flink project for almost
> > two
> > years, authored and reviewed hundreds of PRs over this time. He is
> > currently
> > the core maintainer of the Hive component, where he contributed many
> > valuable
> > features, including the Hive dialect with 95% compatibility and small
> file
> > compaction.
> > In addition, Yuxia driven FLIP-282 (DELETE & UPDATE API) to better
> > integrate
> > Flink with data lakes. He actively participated in dev discussions and
> > answered
> > many questions on the user mailing list.
> >
> > Please join me in congratulating Yuxia Luo for becoming a Flink
> Committer!
> >
> > Best,
> > Jark Wu (on behalf of the Flink PMC)
> >
>


[jira] [Created] (FLINK-31390) Optimize the FlinkChangelogModeInferenceProgram by avoiding unnecessary traversals.

2023-03-09 Thread Aitozi (Jira)
Aitozi created FLINK-31390:
--

 Summary: Optimize the FlinkChangelogModeInferenceProgram by 
avoiding unnecessary traversals.
 Key: FLINK-31390
 URL: https://issues.apache.org/jira/browse/FLINK-31390
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


We can avoid the unnecessary traversals of the RelNode tree, since we are only 
interested in the first satisfied plan.

 

FlinkChangelogModeInferenceProgram
{code:java}
val updateKindTraitVisitor = new SatisfyUpdateKindTraitVisitor(context)
val finalRoot = requiredUpdateKindTraits.flatMap {
  requiredUpdateKindTrait =>
updateKindTraitVisitor.visit(rootWithModifyKindSet, 
requiredUpdateKindTrait)
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31362) Add document about how to use C-style escape strings

2023-03-07 Thread Aitozi (Jira)
Aitozi created FLINK-31362:
--

 Summary: Add document about how to use C-style escape strings
 Key: FLINK-31362
 URL: https://issues.apache.org/jira/browse/FLINK-31362
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null overwrite problem of partial-insert

2023-03-06 Thread Aitozi
> we can initiate corresponding support issues for
specific connectors to follow up on support after finalizing the API
changes

Make sense to me!

Best,
Aitozi.

Lincoln Lee  于2023年3月7日周二 15:05写道:

> Thanks Jingsong & Hang,
>
> Using Optional as the return value is a good idea. Previously, I
> hoped to keep the style of the LookupTableSource.LookupContext#getKeys as
> consistent as possible, but the getKeys is actually non-empty when used, so
> I support updating to Optional.  I'll update the flip doc and poc
> later tonight.
>
> Best,
> Lincoln Lee
>
>
> Lincoln Lee  于2023年3月7日周二 14:53写道:
>
> > Hi Aitozi,
> >
> > Thanks for your feedback!  Yes, including HBase and JDBC connector, they
> > can be considered for support in the next step (JDBC as as a standard
> > protocol supported not only in traditional databases, but also in more
> and
> > more new types of storage). Considering the ongoing externalizing of
> > connectors and the release cycles of the connectors are decoupled with
> the
> > release cycle of Flink, we can initiate corresponding support issues for
> > specific connectors to follow up on support after finalizing the API
> > changes, WDYT?
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Hang Ruan  于2023年3月7日周二 12:14写道:
> >
> >> Hi, Lincoln,
> >>
> >> Thanks for bringing this up. It looks good to me. I also agree with
> >> Jingsong's suggestion.
> >>
> >> Best,
> >> Hang
> >>
> >> Jingsong Li  于2023年3月7日周二 11:15写道:
> >>
> >> > Wow, we have 300 FLIPs...
> >> >
> >> > Thanks Lincoln,
> >> >
> >> > Have you considered returning an Optional?
> >> >
> >> > Empty array looks a little weird to me.
> >> >
> >> > Best,
> >> > Jingsong
> >> >
> >> > On Tue, Mar 7, 2023 at 10:32 AM Aitozi  wrote:
> >> > >
> >> > > Hi Lincoln,
> >> > > Thank you for sharing this FLIP. Overall, it looks good to me. I
> >> have
> >> > > one question: with the introduction of this interface,
> >> > > will any existing Flink connectors need to be updated in order to
> take
> >> > > advantage of its capabilities? For example, HBase.
> >> > >
> >> > > yuxia  于2023年3月7日周二 10:01写道:
> >> > >
> >> > > > Thanks. It makes sense to me.
> >> > > >
> >> > > > Best regards,
> >> > > > Yuxia
> >> > > >
> >> > > > - 原始邮件 -
> >> > > > 发件人: "Lincoln Lee" 
> >> > > > 收件人: "dev" 
> >> > > > 发送时间: 星期一, 2023年 3 月 06日 下午 10:26:26
> >> > > > 主题: Re: [DISCUSS] FLIP-300: Add targetColumns to
> >> > DynamicTableSink#Context
> >> > > > to solve the null overwrite problem of partial-insert
> >> > > >
> >> > > > hi yuxia,
> >> > > >
> >> > > > Thanks for your feedback and tracking the issue of update
> statement!
> >> > I've
> >> > > > updated the FLIP[1] and also the poc[2].
> >> > > > Since the bug and flip are orthogonal, we can focus on finalizing
> >> the
> >> > api
> >> > > > changes first, and then work on the flip implementation and bugfix
> >> > > > separately, WDYT?
> >> > > >
> >> > > > [1]
> >> > > >
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081
> >> > > > [2] https://github.com/apache/flink/pull/22041
> >> > > >
> >> > > > Best,
> >> > > > Lincoln Lee
> >> > > >
> >> > > >
> >> > > > yuxia  于2023年3月6日周一 21:21写道:
> >> > > >
> >> > > > > Hi, Lincoln.
> >> > > > > Thanks for bringing this up. +1 for this FLIP, it's helpful for
> >> > external
> >> > > > > storage system to implement partial update.
> >> > > > > The FLIP looks good to me. I only want to add one comment,
> update
> >> > > > > statement also doesn't support updating nested column, I have
> >> created
> >> > > > > FLINK-31344[1] to track it.
> >> > > > > Maybe we also need to explain 

Re: [DISCUSS] FLIP-300: Add targetColumns to DynamicTableSink#Context to solve the null overwrite problem of partial-insert

2023-03-06 Thread Aitozi
Hi Lincoln,
Thank you for sharing this FLIP. Overall, it looks good to me. I have
one question: with the introduction of this interface,
will any existing Flink connectors need to be updated in order to take
advantage of its capabilities? For example, HBase.

yuxia  于2023年3月7日周二 10:01写道:

> Thanks. It makes sense to me.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Lincoln Lee" 
> 收件人: "dev" 
> 发送时间: 星期一, 2023年 3 月 06日 下午 10:26:26
> 主题: Re: [DISCUSS] FLIP-300: Add targetColumns to DynamicTableSink#Context
> to solve the null overwrite problem of partial-insert
>
> hi yuxia,
>
> Thanks for your feedback and tracking the issue of update statement! I've
> updated the FLIP[1] and also the poc[2].
> Since the bug and flip are orthogonal, we can focus on finalizing the api
> changes first, and then work on the flip implementation and bugfix
> separately, WDYT?
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081
> [2] https://github.com/apache/flink/pull/22041
>
> Best,
> Lincoln Lee
>
>
> yuxia  于2023年3月6日周一 21:21写道:
>
> > Hi, Lincoln.
> > Thanks for bringing this up. +1 for this FLIP, it's helpful for external
> > storage system to implement partial update.
> > The FLIP looks good to me. I only want to add one comment, update
> > statement also doesn't support updating nested column, I have created
> > FLINK-31344[1] to track it.
> > Maybe we also need to explain it in this FLIP.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-31344
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Lincoln Lee" 
> > 收件人: "dev" 
> > 发送时间: 星期五, 2023年 3 月 03日 下午 12:22:19
> > 主题: [DISCUSS] FLIP-300: Add targetColumns to DynamicTableSink#Context to
> > solve the null overwrite problem of partial-insert
> >
> > Hi everyone,
> >
> > This FLIP[1] aims to support connectors in avoiding overwriting
> non-target
> > columns with null values when processing partial column updates, we
> propose
> > adding information on the target column list to DynamicTableSink#Context.
> >
> > FLINK-18726[2] supports inserting statements with specified column list,
> it
> > fills null values (or potentially declared default values in the future)
> > for columns not appearing in the column list of insert statement to the
> > target table.
> > But this behavior does not satisfy some partial column update
> requirements
> > of some storage systems which allow storing null values. The problem is
> > that connectors cannot distinguish whether the null value of a column is
> > really from the user's data or whether it is a null value populated
> because
> > of partial insert behavior.
> >
> > Looking forward to your comments or feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240885081
> > [2] https://issues.apache.org/jira/browse/FLINK-18726
> >
> > Best,
> > Lincoln Lee
> >
>


Re: [DISCUSS] String literal behavior in Flink

2023-03-06 Thread Aitozi
Hi, Jark

FYI, this feature has already been supported in Calcite 1.33.0 [1].
Therefore, I believe we can use it directly after upgrading the Calcite
version.

Best,
Aitozi.
[1]: https://issues.apache.org/jira/browse/CALCITE-5305

Aitozi  于2023年3月6日周一 16:48写道:

> Thanks, will give it a try
>
> Best,
> Aitozi.
>
> Jark Wu  于2023年3月6日周一 15:11写道:
>
>> Hi Aitozi,
>>
>> I would suggest trying to contribute it to the upstream project Calcite
>> first.
>>
>> Best,
>> Jark
>>
>> > 2023年3月6日 11:51,Aitozi  写道:
>> >
>> > Hi Jark,
>> >
>> > Thank you for your helpful suggestion. It appears that 'E'foo\n'' is a
>> more
>> > versatile and widely accepted option. To assess its feasibility, I have
>> > reviewed the relevant Unicode supports and concluded that it may
>> > necessitate modifications to the Parser.jj file to accommodate this new
>> > syntax.
>> >
>> >
>> > I am unsure whether we should initially incorporate this alteration in
>> > Calcite or if we can directly supersede the StringLiteral behavior
>> within
>> > the Flink project. Nevertheless, I believe supporting this change is
>> > achievable.
>> >
>> >
>> >
>> > Thanks,
>> > Aitozi.
>> >
>> > Jark Wu  于2023年3月6日周一 10:16写道:
>> >
>> >> Hi Aitozi,
>> >>
>> >> I think this is a good idea to improve the backslash escape strings.
>> >> However, I lean a bit more toward the Postgres approach[1],
>> >> which is more standard-compliant. PG allows backslash escape
>> >> string by writing the letter E (upper or lower case) just before the
>> >> opening single quote, e.g., E'foo\n'.
>> >>
>> >> Recognizing backslash escapes in both regular and escape string
>> constants
>> >> is not backward compatible in Flink, and is also deprecated in PG.
>> >>
>> >> In addition, Flink also supports Unicode escape string constants by
>> >> writing the U& before the quote[1] which works in the same way with
>> >> backslash escape string.
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >> [1]:
>> >>
>> >>
>> https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS
>> >> [2]:
>> >>
>> >>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/overview/
>> >>
>> >> On Sat, 4 Mar 2023 at 23:31, Aitozi  wrote:
>> >>
>> >>> Hi,
>> >>>  I encountered a problem when using string literal in Flink.
>> Currently,
>> >>> Flink will escape the string literal during codegen, so for the query
>> >>> below:
>> >>>
>> >>> SELECT 'a\nb'; it will print => a\nb
>> >>>
>> >>> then for the query
>> >>>
>> >>> SELECT SPLIT_INDEX(col, '\n', 0);
>> >>>
>> >>> The col can not split by the newline. If we want to split by the
>> newline,
>> >>> we should use
>> >>>
>> >>> SELECT SPLIT_INDEX(col, '
>> >>> ', 0)
>> >>>
>> >>> or
>> >>>
>> >>> SELECT SPLIT_INDEX(col, CHR(10), 0)
>> >>>
>> >>> The above way could be more intuitive. Some other databases support
>> these
>> >>> "Special Character Escape Sequences"[1].
>> >>>
>> >>> In this way, we can directly use
>> >>> SELECT SPLIT_INDEX(col, '\n', 0); for the query.
>> >>>
>> >>> I know this is not standard behavior in ANSI SQL. I'm opening this
>> thread
>> >>> for some opinions from the community guys.
>> >>>
>> >>> [1]:
>> >>>
>> >>>
>> >>
>> https://dev.mysql.com/doc/refman/8.0/en/string-literals.html#character-escape-sequences
>> >>>
>> >>> Thanks,
>> >>> Aitozi
>> >>>
>> >>
>>
>>


Re: [DISCUSS] String literal behavior in Flink

2023-03-06 Thread Aitozi
Thanks, will give it a try

Best,
Aitozi.

Jark Wu  于2023年3月6日周一 15:11写道:

> Hi Aitozi,
>
> I would suggest trying to contribute it to the upstream project Calcite
> first.
>
> Best,
> Jark
>
> > 2023年3月6日 11:51,Aitozi  写道:
> >
> > Hi Jark,
> >
> > Thank you for your helpful suggestion. It appears that 'E'foo\n'' is a
> more
> > versatile and widely accepted option. To assess its feasibility, I have
> > reviewed the relevant Unicode supports and concluded that it may
> > necessitate modifications to the Parser.jj file to accommodate this new
> > syntax.
> >
> >
> > I am unsure whether we should initially incorporate this alteration in
> > Calcite or if we can directly supersede the StringLiteral behavior within
> > the Flink project. Nevertheless, I believe supporting this change is
> > achievable.
> >
> >
> >
> > Thanks,
> > Aitozi.
> >
> > Jark Wu  于2023年3月6日周一 10:16写道:
> >
> >> Hi Aitozi,
> >>
> >> I think this is a good idea to improve the backslash escape strings.
> >> However, I lean a bit more toward the Postgres approach[1],
> >> which is more standard-compliant. PG allows backslash escape
> >> string by writing the letter E (upper or lower case) just before the
> >> opening single quote, e.g., E'foo\n'.
> >>
> >> Recognizing backslash escapes in both regular and escape string
> constants
> >> is not backward compatible in Flink, and is also deprecated in PG.
> >>
> >> In addition, Flink also supports Unicode escape string constants by
> >> writing the U& before the quote[1] which works in the same way with
> >> backslash escape string.
> >>
> >> Best,
> >> Jark
> >>
> >> [1]:
> >>
> >>
> https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS
> >> [2]:
> >>
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/overview/
> >>
> >> On Sat, 4 Mar 2023 at 23:31, Aitozi  wrote:
> >>
> >>> Hi,
> >>>  I encountered a problem when using string literal in Flink. Currently,
> >>> Flink will escape the string literal during codegen, so for the query
> >>> below:
> >>>
> >>> SELECT 'a\nb'; it will print => a\nb
> >>>
> >>> then for the query
> >>>
> >>> SELECT SPLIT_INDEX(col, '\n', 0);
> >>>
> >>> The col can not split by the newline. If we want to split by the
> newline,
> >>> we should use
> >>>
> >>> SELECT SPLIT_INDEX(col, '
> >>> ', 0)
> >>>
> >>> or
> >>>
> >>> SELECT SPLIT_INDEX(col, CHR(10), 0)
> >>>
> >>> The above way could be more intuitive. Some other databases support
> these
> >>> "Special Character Escape Sequences"[1].
> >>>
> >>> In this way, we can directly use
> >>> SELECT SPLIT_INDEX(col, '\n', 0); for the query.
> >>>
> >>> I know this is not standard behavior in ANSI SQL. I'm opening this
> thread
> >>> for some opinions from the community guys.
> >>>
> >>> [1]:
> >>>
> >>>
> >>
> https://dev.mysql.com/doc/refman/8.0/en/string-literals.html#character-escape-sequences
> >>>
> >>> Thanks,
> >>> Aitozi
> >>>
> >>
>
>


Re: [DISCUSS] String literal behavior in Flink

2023-03-05 Thread Aitozi
Hi Jark,

Thank you for your helpful suggestion. It appears that 'E'foo\n'' is a more
versatile and widely accepted option. To assess its feasibility, I have
reviewed the relevant Unicode supports and concluded that it may
necessitate modifications to the Parser.jj file to accommodate this new
syntax.


I am unsure whether we should initially incorporate this alteration in
Calcite or if we can directly supersede the StringLiteral behavior within
the Flink project. Nevertheless, I believe supporting this change is
achievable.



Thanks,
Aitozi.

Jark Wu  于2023年3月6日周一 10:16写道:

> Hi Aitozi,
>
> I think this is a good idea to improve the backslash escape strings.
> However, I lean a bit more toward the Postgres approach[1],
> which is more standard-compliant. PG allows backslash escape
> string by writing the letter E (upper or lower case) just before the
> opening single quote, e.g., E'foo\n'.
>
> Recognizing backslash escapes in both regular and escape string constants
> is not backward compatible in Flink, and is also deprecated in PG.
>
> In addition, Flink also supports Unicode escape string constants by
> writing the U& before the quote[1] which works in the same way with
> backslash escape string.
>
> Best,
> Jark
>
> [1]:
>
> https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-CONSTANTS
> [2]:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/overview/
>
> On Sat, 4 Mar 2023 at 23:31, Aitozi  wrote:
>
> > Hi,
> >   I encountered a problem when using string literal in Flink. Currently,
> > Flink will escape the string literal during codegen, so for the query
> > below:
> >
> > SELECT 'a\nb'; it will print => a\nb
> >
> > then for the query
> >
> > SELECT SPLIT_INDEX(col, '\n', 0);
> >
> > The col can not split by the newline. If we want to split by the newline,
> > we should use
> >
> > SELECT SPLIT_INDEX(col, '
> > ', 0)
> >
> > or
> >
> > SELECT SPLIT_INDEX(col, CHR(10), 0)
> >
> > The above way could be more intuitive. Some other databases support these
> > "Special Character Escape Sequences"[1].
> >
> > In this way, we can directly use
> > SELECT SPLIT_INDEX(col, '\n', 0); for the query.
> >
> > I know this is not standard behavior in ANSI SQL. I'm opening this thread
> > for some opinions from the community guys.
> >
> > [1]:
> >
> >
> https://dev.mysql.com/doc/refman/8.0/en/string-literals.html#character-escape-sequences
> >
> > Thanks,
> > Aitozi
> >
>


[DISCUSS] String literal behavior in Flink

2023-03-04 Thread Aitozi
Hi,
  I encountered a problem when using string literal in Flink. Currently,
Flink will escape the string literal during codegen, so for the query below:

SELECT 'a\nb'; it will print => a\nb

then for the query

SELECT SPLIT_INDEX(col, '\n', 0);

The col can not split by the newline. If we want to split by the newline,
we should use

SELECT SPLIT_INDEX(col, '
', 0)

or

SELECT SPLIT_INDEX(col, CHR(10), 0)

The above way could be more intuitive. Some other databases support these
"Special Character Escape Sequences"[1].

In this way, we can directly use
SELECT SPLIT_INDEX(col, '\n', 0); for the query.

I know this is not standard behavior in ANSI SQL. I'm opening this thread
for some opinions from the community guys.

[1]:
https://dev.mysql.com/doc/refman/8.0/en/string-literals.html#character-escape-sequences

Thanks,
Aitozi


[jira] [Created] (FLINK-31296) Add JoinConditionEqualityTransferRule to stream optimizer

2023-03-01 Thread Aitozi (Jira)
Aitozi created FLINK-31296:
--

 Summary: Add JoinConditionEqualityTransferRule to stream optimizer
 Key: FLINK-31296
 URL: https://issues.apache.org/jira/browse/FLINK-31296
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


I find that {{JoinConditionEqualityTransferRule}} is a common rule for batch 
and stream mode. So it should be added to the stream optimizer which will bring 
performance improvement in some case.

Maybe, other rules also need to be reviewed whether can be aligned in batch and 
stream case.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31260) PushLocalHashAggIntoScanRule should also work with union RelNode

2023-02-28 Thread Aitozi (Jira)
Aitozi created FLINK-31260:
--

 Summary: PushLocalHashAggIntoScanRule should also work with union 
RelNode
 Key: FLINK-31260
 URL: https://issues.apache.org/jira/browse/FLINK-31260
 Project: Flink
  Issue Type: Improvement
Reporter: Aitozi


As discussed in 
[comments|https://github.com/apache/flink/pull/22001#discussion_r1119652784] 
Currently, {{PushLocalHashAggIntoScanRule}} match for the Exchange -> 
LocalHashAggregate -> Scan. As a result, the following pattern can not be 
optimized


{code:java}
  +- Union(all=[true], union=[type, sum$0])
 :- Union(all=[true], union=[type, sum$0])
 :  :- LocalHashAggregate(groupBy=[type], select=[type, 
Partial_SUM(price) AS sum$0])
 :  :  +- TableSourceScan(table=[[default_catalog, default_database, 
table1, project=[type, price], metadata=[]]], fields=[type, price])
 :  +- LocalHashAggregate(groupBy=[type], select=[type, 
Partial_SUM(price) AS sum$0])
 : +- TableSourceScan(table=[[default_catalog, default_database, 
table2, project=[type, price], metadata=[]]], fields=[type, price])
 +- LocalHashAggregate(groupBy=[type], select=[type, Partial_SUM(price) 
AS sum$0])
+- TableSourceScan(table=[[default_catalog, default_database, 
table3, project=[type, price], metadata=[]]], fields=[type, price])

{code}

We should extend the rule to support this pattern.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-297: Improve Auxiliary Sql Statements

2023-02-23 Thread Aitozi
Hi,
Thanks for the nice proposal, Ran.
Regarding this api usage, I have some discussion with @twalthr before
as here <https://github.com/apache/flink/pull/15137#issuecomment-1356124138>
Personally, I think leaking the Catalog to the user side is not suitable,
since there are some read/write operations in the Catalog, the
TableEnvironment#getCatalog will expose all of them to the user side. So I
learned to add a new api in TableEnvironment to reduce reliance on the
current TableEnvironment#getCatalog.

Thanks,
Aitozi


Ran Tao  于2023年2月23日周四 23:44写道:

> Hi, JingSong, Jing.
>
> thank for sharing your opinions.
>
> What you say makes sense, both approaches have pros and cons.
>
> If it is a modification of `TableEnvrionment`, such as
> listDatabases(catalog). It is actually consistent with the other overloaded
> methods before,
> and defining this method means that TableEnvrionment does provide this
> capability (rather than relying on the functionality of another class).
> The disadvantage is that api changes may be required, and may continue to
> be modified in the future.
> But from the TableEnvrionment itself, it really doesn't pay attention to
> how the underlying layer is implemented.
> (Although it is actually taken from the catalogManager at present, this is
> another question)
>
> Judging from the current dependencies, flink-table-api-java strongly relies
> on flink-table-common to use various common classes and interfaces,
> especially the catalog interface.
> So we can extract various metadata information in the catalog through
> `tEnv.getCatalog`.
> The advantage is that it will not cause api modification, but this method
> of use breaks LoD.
> In fact, the current flink-table-api-java design is completely bound to the
> catalog interface.
>
> If the mandatory modification of PublicApi is constrained (may be modified
> again and later), I tend to use `tEnv.getCatalog` directly, otherwise
> It would actually be more standard to add overloaded methods to
> `TableEnvrionment`.
>
> Another question, can the later capabilities of TableEnvrionment be
> implemented through SupportXXX?
> In order to solve the problem that the method needs to be added in the
> future. This kind of usage occurs frequently in flink.
>
> Looking forward to your other considerations,
> and also try to wait to see if there are other relevant API designers or
> committers to provide comments.
>
>
> Best Regards,
> Ran Tao
>
> Jing Ge  于2023年2月23日周四 18:58写道:
>
> > Hi Jingson,
> >
> > Thanks for sharing your thoughts. Please see my reply below.
> >
> > On Thu, Feb 23, 2023 at 10:16 AM Jingsong Li 
> > wrote:
> >
> > > Hi Jing Ge,
> > >
> > > First, flink-table-common contains all common classes of Flink Table,
> > > I think it is hard to bypass its dependence.
> > >
> >
> > If any time when we use flink-table-api-java, we have to cross through
> > flink-table-api-java and use flink-table-common, we should reconsider the
> > design of these two modules and how interfaces/classes are classified
> into
> > those modules.
> >
> >
> > >
> > > Secondly, almost all methods in Catalog looks useful to me, so if we
> > > are following LoD, we should add all methods again to
> > > TableEnvironment. I think it is redundant.
> > >
> >
> > That is the enlarged issue I mentioned previously. A simple solution is
> to
> > move Catalog to the top level API. The fact is that a catalog package
> > already exists in flink-table-api-java but the Catalog interface is in
> > flink-table-common. I don't know the historical context of this design.
> > Maybe you could share some insight with us? Thanks in advance. Beyond
> that,
> > there should be other AOP options but need more time to figure it out.
> >
> >
> > >
> > > And, this API chain does not look deep.
> > > - "tEnv.getCatalog(tEnv.getCurrentCatalog()).get().listDatabases()"
> > > looks a little complicated. The complex part is ahead.
> > > - If we have a method to get Catalog directly, can be simplify to
> > > "tEnv.catalog().listDatabase()", this is simple.
> > >
> >
> > Commonly, it will need more effort to always follow LoD, but for the top
> > level facade API like TableEnvironment, both the API developer, API
> > consumer and the project itself from a long-term perspective will benefit
> > from sticking to LoD. Since we already have the getCatalog(String
> catalog)
> > method in TableEnvironment, it also makes sense to follow your
> suggestion,
> > if we only w

[jira] [Created] (FLINK-31205) do optimize for multi sink in a single relNode tree

2023-02-23 Thread Aitozi (Jira)
Aitozi created FLINK-31205:
--

 Summary: do optimize for multi sink in a single relNode tree 
 Key: FLINK-31205
 URL: https://issues.apache.org/jira/browse/FLINK-31205
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


Flink supports multi sink usage, but it optimize the each sink in a individual 
RelNode tree, this will miss some opportunity to do some cross tree 
optimization, eg: 


{code:java}
create table newX(
  a int,
  b bigint,
  c varchar,
  d varchar,
  e varchar
) with (
  'connector' = 'values'
  ,'enable-projection-push-down' = 'true'


insert into sink_table select a, b from newX
insert into sink_table select a, 1 from newX
{code}

It will produce the plan as below, this will cause the source be consumed twice


{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, 
b], metadata=[]]], fields=[a, b])

Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Calc(select=[a, 1 AS b])
   +- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a], metadata=[]]], fields=[a])

{code}

In this ticket, I propose to do a global optimization for the multi sink by 
* Megre the multi sink(with same table) into a single relNode tree with an 
extra union node
* After optimization, split the merged union back to the original multi sink

In my poc, after step 1, it will produce the plan as below, I think it will do 
good for the global performacne


{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Union(all=[true], union=[a, b])
   :- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
   +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
  +- Reused(reference_id=[1])
{code}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30570) RexNodeExtractor#isSupportedPartitionPredicate generates unexpected partition predicates

2023-01-05 Thread Aitozi (Jira)
Aitozi created FLINK-30570:
--

 Summary: RexNodeExtractor#isSupportedPartitionPredicate generates 
unexpected partition predicates
 Key: FLINK-30570
 URL: https://issues.apache.org/jira/browse/FLINK-30570
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Aitozi


Currently, the condition {{where rand(1) > 0.0125}} will be recognized as a 
partition predicates and will be evaluated to false when compiling the SQL. It 
has two problem. 
First, it should not be recognized as a partition predicates, and the 
nondeterministic function should never pass the partition pruner 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30551) Add open method to PartitionCommitPolicy

2023-01-03 Thread Aitozi (Jira)
Aitozi created FLINK-30551:
--

 Summary: Add open method to PartitionCommitPolicy
 Key: FLINK-30551
 URL: https://issues.apache.org/jira/browse/FLINK-30551
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Reporter: Aitozi


Currently, the {{PartitionCommitPolicy}} do not have the open hook. The custom 
partition commit policy does not have an appropriate entry point for the init 
work.
So I purpose to add an {{open}} method to make this work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[Discuss] Discuss about the interface of Catalog

2022-12-17 Thread Aitozi
Hi devs
Recently I'm looking to migrate the deprecated TableSchema to
ResolvedSchema and Schema pair in FLINK-29585. But I found it a little
not suitable for the Catalog's APIs.

There are currently three interfaces that will deal with the CatalogBaseTable.

1. createTable
2. alterTable
3. getTable

Although it describes in the comments that the framework will ensure
it will pass the resolved table for the implementer, it's not enforced
in the interface, it will make the implementer make an extra cast for
the resolved information. eg: ResolvedSchema.

*
* The framework will make sure to call this method with fully
validated {@link
* ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those
instances are easy to serialize
* for a durable catalog implementation.
*

So I purpose adjusting the `createTable` and `alterTable` to work with
ResolvedCatalogBaseTable directly.
For the getTable, I have no strong preference but I think it can also
work with ResolvedCatalogBaseTable to return the
ResolvedCatalogBaseTable. Since the current implementation is already
able to do that.

Thanks,
Aitozi


[jira] [Created] (FLINK-30242) Push localHashAggregate pass the union node

2022-11-29 Thread Aitozi (Jira)
Aitozi created FLINK-30242:
--

 Summary: Push localHashAggregate pass the union node
 Key: FLINK-30242
 URL: https://issues.apache.org/jira/browse/FLINK-30242
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


The local hash aggregate after union will have an extra shuffle stage. We can 
swap it with the union node so the local hash aggregate can chain with the 
mapper stage saving the unnecessary shuffle, especially in the batch job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30198) Support AdaptiveBatchScheduler to set per-task size for reducer task

2022-11-24 Thread Aitozi (Jira)
Aitozi created FLINK-30198:
--

 Summary: Support AdaptiveBatchScheduler to set per-task size for 
reducer task 
 Key: FLINK-30198
 URL: https://issues.apache.org/jira/browse/FLINK-30198
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Aitozi


When we use AdaptiveBatchScheduler in our case, we found that it can work well 
in most case, but there is a limit that, there is only one global parameter for 
per task data size by 
{{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 

However, in a map-reduce architecture, the reducer tasks are usually have more 
complex computation logic such as aggregate/sort/join operators. So I think it 
will be nicer we can set the reducer and mapper task's data size per task 
individually.

Then, how to distinguish the reducer task, IMO, we can let the parallelism 
decider know whether the vertex have a hash edge inputs. If yes, it should be a 
reducer task.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30019) Remove the unused HiveTableFileInputFormat in hive connector

2022-11-14 Thread Aitozi (Jira)
Aitozi created FLINK-30019:
--

 Summary: Remove the unused HiveTableFileInputFormat in hive 
connector
 Key: FLINK-30019
 URL: https://issues.apache.org/jira/browse/FLINK-30019
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Hive
Reporter: Aitozi


As I see, after https://issues.apache.org/jira/browse/FLINK-19888 the hive 
connector do not reply on the HiveTableFileInputFormat now, it can be safely 
removed now



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29980) Wrap the HiveSource's bulkFormat to handle the partition keys

2022-11-10 Thread Aitozi (Jira)
Aitozi created FLINK-29980:
--

 Summary: Wrap the HiveSource's bulkFormat to handle the partition 
keys
 Key: FLINK-29980
 URL: https://issues.apache.org/jira/browse/FLINK-29980
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: Aitozi


As described in https://issues.apache.org/jira/browse/FLINK-25113 to clean up 
the partition keys logic in the parquet and orc formats, hive source should 
leverage the {{FileInfoExtractorBulkFormat}} to handle the partition keys 
internally



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29800) Continuous failover will leak the inprogress output file

2022-10-30 Thread Aitozi (Jira)
Aitozi created FLINK-29800:
--

 Summary: Continuous failover will leak the inprogress output file
 Key: FLINK-29800
 URL: https://issues.apache.org/jira/browse/FLINK-29800
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Reporter: Aitozi


When running job which sink to the file system, the inprogress files will keep 
growing when job keeps failover, it will do harm to the filesystem. I think the 
clean up to the file which is currently written to should be performed when job 
failing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29795) The source.file.stream.io-fetch-size can not be set by with properties

2022-10-29 Thread Aitozi (Jira)
Aitozi created FLINK-29795:
--

 Summary: The source.file.stream.io-fetch-size can not be set by 
with properties
 Key: FLINK-29795
 URL: https://issues.apache.org/jira/browse/FLINK-29795
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.17.0
Reporter: Aitozi


The \{{source.file.stream.io-fetch-size}} is used in the bulk format mode, but 
it is not exposed to the filesystem connector options. It can only be set by 
add it to the \{{flink-conf.yaml}}. It's not convenient and it should be scoped 
to the table's property.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29748) Expose the optimize phase in the connector context

2022-10-24 Thread Aitozi (Jira)
Aitozi created FLINK-29748:
--

 Summary: Expose the optimize phase in the connector context
 Key: FLINK-29748
 URL: https://issues.apache.org/jira/browse/FLINK-29748
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Aitozi


Currently, in the connector it can not know whether the whole optimize is 
finished.
When the optimize finished, the all information is static, eg: the reading 
partitions. If I want to validate the final optimized result (like whether the 
reading partition is too much or empty), it needs the context of what is the 
current phase. I think the {{ScanContext}} is ok to expose this information. 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29557) The SinkOperator's OutputFormat function is not recognized

2022-10-09 Thread Aitozi (Jira)
Aitozi created FLINK-29557:
--

 Summary: The SinkOperator's OutputFormat function is not recognized
 Key: FLINK-29557
 URL: https://issues.apache.org/jira/browse/FLINK-29557
 Project: Flink
  Issue Type: Bug
  Components: API / Core, Table SQL / API
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29308) NoResourceAvailableException fails the batch job

2022-09-14 Thread Aitozi (Jira)
Aitozi created FLINK-29308:
--

 Summary: NoResourceAvailableException fails the batch job
 Key: FLINK-29308
 URL: https://issues.apache.org/jira/browse/FLINK-29308
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Aitozi


When running batch job configured with the following restart strategy
{code:java}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.delay: 15 s
restart-strategy.fixed-delay.attempts: 10 {code}
If the cluster resource is not enough to run the single stage, it can run 
partial of the stage, but it still will fail after the 10 times 
\{{NoResourceAvailableException}}. IMO, for batch job the 
\{{NoResourceAvailableException}} do not necessary to trigger the job to fail. 
Or at least this failure reason are not share the same restart strategy with 
other failure reasons



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29200) Provide the way to delay the pod deletion for debugging purpose

2022-09-05 Thread Aitozi (Jira)
Aitozi created FLINK-29200:
--

 Summary: Provide the way to delay the pod deletion for debugging 
purpose
 Key: FLINK-29200
 URL: https://issues.apache.org/jira/browse/FLINK-29200
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: Aitozi


Currently, if the TaskManager heartbeat timeout the pod will be deleted 
immediately. It's not very convenient for debugging the internal reason, eg: we 
can not easily get the core dump files if it's crashed for JVM bugs and so on.

So, I purpose to introduce an option to control the delay of the pod deletion, 
it can be enabled to keep the pod alive for some debugging reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29088) Project push down cause the source reuse can not work

2022-08-23 Thread Aitozi (Jira)
Aitozi created FLINK-29088:
--

 Summary: Project push down cause the source reuse can not work
 Key: FLINK-29088
 URL: https://issues.apache.org/jira/browse/FLINK-29088
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29085) Add the name for test as hint for BuiltInFunctionTestBase

2022-08-23 Thread Aitozi (Jira)
Aitozi created FLINK-29085:
--

 Summary: Add the name for test as hint for BuiltInFunctionTestBase
 Key: FLINK-29085
 URL: https://issues.apache.org/jira/browse/FLINK-29085
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Aitozi


when running tests extends the {{BuiltInFunctionTestBase}}, I found it's hard 
to distinguish the failed tests, I think it will be easy to add the name prefix 
for the {{TestItem}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29066) Reconsider the runtime property of the BuiltInFunctionDefinition

2022-08-22 Thread Aitozi (Jira)
Aitozi created FLINK-29066:
--

 Summary: Reconsider the runtime property of the 
BuiltInFunctionDefinition
 Key: FLINK-29066
 URL: https://issues.apache.org/jira/browse/FLINK-29066
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


Currently, it has three types of the runtime property:
1) runtimeclass which means flink provide a class to define the runtime 
implementation
2) runtimeProvider which means the runtime class is code generated 
3) runtimeDefered which means it will use the calcite's inner function to 
handle.

First, the runtimeDefered is only a mark interface, have no actual effect, and 
the JSON_Xx functions already port to the flink internal stack, so I think it 
can be removed now.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29061) Cleanup dead code in StringCallGen

2022-08-22 Thread Aitozi (Jira)
Aitozi created FLINK-29061:
--

 Summary: Cleanup dead code in StringCallGen
 Key: FLINK-29061
 URL: https://issues.apache.org/jira/browse/FLINK-29061
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


In the https://issues.apache.org/jira/browse/FLINK-13522, we dropped some 
function in the blink planner. There are still some dead codes for it now. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Apache Flink Committer - Lijie Wang

2022-08-19 Thread Aitozi
Congrats Lijie!

Best regards,
Aitozi

Jing Ge  于2022年8月19日周五 06:02写道:

> Congrats Lijie!
>
> Best regards,
> Jing
>
> On Thu, Aug 18, 2022 at 8:40 AM Terry Wang  wrote:
>
> > Congratulations, Lijie!
> >
> > On Thu, Aug 18, 2022 at 11:31 AM Leonard Xu  wrote:
> >
> > > Congratulations, Lijie!
> > >
> > > Best,
> > > Leonard
> > >
> > > > 2022年8月18日 上午11:26,Zhipeng Zhang  写道:
> > > >
> > > > Congratulations, Lijie!
> > > >
> > > > Xintong Song  于2022年8月18日周四 11:23写道:
> > > >>
> > > >> Congratulations Lijie, and welcome~!
> > > >>
> > > >> Best,
> > > >>
> > > >> Xintong
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Aug 18, 2022 at 11:12 AM Xingbo Huang 
> > > wrote:
> > > >>
> > > >>> Congrats, Lijie
> > > >>>
> > > >>> Best,
> > > >>> Xingbo
> > > >>>
> > > >>> Lincoln Lee  于2022年8月18日周四 11:01写道:
> > > >>>
> > > >>>> Congratulations, Lijie!
> > > >>>>
> > > >>>> Best,
> > > >>>> Lincoln Lee
> > > >>>>
> > > >>>>
> > > >>>> Benchao Li  于2022年8月18日周四 10:51写道:
> > > >>>>
> > > >>>>> Congratulations Lijie!
> > > >>>>>
> > > >>>>> yanfei lei  于2022年8月18日周四 10:44写道:
> > > >>>>>
> > > >>>>>> Congratulations, Lijie!
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Yanfei
> > > >>>>>>
> > > >>>>>> JunRui Lee  于2022年8月18日周四 10:35写道:
> > > >>>>>>
> > > >>>>>>> Congratulations, Lijie!
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> JunRui
> > > >>>>>>>
> > > >>>>>>> Timo Walther  于2022年8月17日周三 19:30写道:
> > > >>>>>>>
> > > >>>>>>>> Congratulations and welcome to the committer team :-)
> > > >>>>>>>>
> > > >>>>>>>> Regards,
> > > >>>>>>>> Timo
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On 17.08.22 12:50, Yuxin Tan wrote:
> > > >>>>>>>>> Congratulations, Lijie!
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Yuxin
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> Guowei Ma  于2022年8月17日周三 18:42写道:
> > > >>>>>>>>>
> > > >>>>>>>>>> Congratulations, Lijie. Welcome on board~!
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Guowei
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Wed, Aug 17, 2022 at 6:25 PM Zhu Zhu 
> > > >>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi everyone,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On behalf of the PMC, I'm very happy to announce Lijie Wang
> > > >>> as
> > > >>>>>>>>>>> a new Flink committer.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Lijie has been contributing to Flink project for more than
> 2
> > > >>>>> years.
> > > >>>>>>>>>>> He mainly works on the runtime/coordination part, doing
> > > >>> feature
> > > >>>>>>>>>>> development, problem debugging and code reviews. He has
> also
> > > >>>>>>>>>>> driven the work of FLIP-187(Adaptive Batch Scheduler) and
> > > >>>>>>>>>>> FLIP-224(Blocklist for Speculative Execution), which are
> > > >>>>> important
> > > >>>>>>>>>>> to run batch jobs.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Please join me in congratulating Lijie for becoming a Flink
> > > >>>>>>> committer!
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Cheers,
> > > >>>>>>>>>>> Zhu
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> --
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Benchao Li
> > > >>>>>
> > > >>>>
> > > >>>
> > > >
> > > >
> > > >
> > > > --
> > > > best,
> > > > Zhipeng
> > >
> > >
> >
> > --
> > Best Regards,
> > Terry Wang
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Junhan Yang

2022-08-19 Thread Aitozi
Congratulations, Junhan!
Best,
Aitozi

Guowei Ma  于2022年8月19日周五 13:18写道:

> Congratulations, Junhan!
> Best,
> Guowei
>
>
> On Fri, Aug 19, 2022 at 6:01 AM Jing Ge  wrote:
>
> > Congrats Junhan!
> >
> > Best regards,
> > Jing
> >
> > On Thu, Aug 18, 2022 at 12:05 PM Jark Wu  wrote:
> >
> > > Congrats and welcome Junhan!
> > >
> > > Cheers,
> > > Jark
> > >
> > > > 2022年8月18日 17:59,Timo Walther  写道:
> > > >
> > > > Congratulations and welcome to the committer team :-)
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > > On 18.08.22 07:19, Lijie Wang wrote:
> > > >> Congratulations, Junhan!
> > > >> Best,
> > > >> Lijie
> > > >> Leonard Xu  于2022年8月18日周四 11:31写道:
> > > >>> Congratulations, Junhan!
> > > >>>
> > > >>> Best,
> > > >>>
> > > >>>> 2022年8月18日 上午11:27,Zhipeng Zhang  写道:
> > > >>>>
> > > >>>> Congratulations, Junhan!
> > > >>>>
> > > >>>> Xintong Song  于2022年8月18日周四 11:21写道:
> > > >>>>>
> > > >>>>> Hi everyone,
> > > >>>>>
> > > >>>>> On behalf of the PMC, I'm very happy to announce Junhan Yang as a
> > new
> > > >>> Flink
> > > >>>>> committer.
> > > >>>>>
> > > >>>>> Junhan has been contributing to the Flink project for more than 1
> > > year.
> > > >>> His
> > > >>>>> contributions are mostly identified in the web frontend,
> including
> > > >>>>> FLIP-241, FLIP-249 and various maintenance efforts of Flink's
> > > frontend
> > > >>>>> frameworks.
> > > >>>>>
> > > >>>>> Please join me in congratulating Junhan for becoming a Flink
> > > committer!
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Xintong
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> best,
> > > >>>> Zhipeng
> > > >>>
> > > >>>
> > > >
> > >
> > >
> >
>


[jira] [Created] (FLINK-28838) Avoid to notify the elementQueue consumer when the fetch result is empty

2022-08-05 Thread Aitozi (Jira)
Aitozi created FLINK-28838:
--

 Summary: Avoid to notify the elementQueue consumer when the fetch 
result is empty
 Key: FLINK-28838
 URL: https://issues.apache.org/jira/browse/FLINK-28838
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.15.1, 1.15.0
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28822) Avoid create VectorizedColumnBatch for each read in ArrowReader

2022-08-04 Thread Aitozi (Jira)
Aitozi created FLINK-28822:
--

 Summary: Avoid create VectorizedColumnBatch for each read in 
ArrowReader
 Key: FLINK-28822
 URL: https://issues.apache.org/jira/browse/FLINK-28822
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Aitozi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28816) Include some metrics for the pod created in operator

2022-08-04 Thread Aitozi (Jira)
Aitozi created FLINK-28816:
--

 Summary: Include some metrics for the pod created in operator
 Key: FLINK-28816
 URL: https://issues.apache.org/jira/browse/FLINK-28816
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Aitozi


Currently, the metrics are around the operator self operation. In our use case, 
we also want to measure the metric especially about the flink pod's create time 
cost, pod create failure rate metrics, I think the operator is the best place 
to put/collect these metrics.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28751) Poor performance of the built in json_value function

2022-07-30 Thread Aitozi (Jira)
Aitozi created FLINK-28751:
--

 Summary: Poor performance of the built in json_value function
 Key: FLINK-28751
 URL: https://issues.apache.org/jira/browse/FLINK-28751
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Aitozi
 Attachments: image-2022-07-30-15-47-34-788.png

When I use the JSON_VALUE function, I found the performance is very poor. It's 
mainly affected by the heavy lock operation in jsonpath inner LRUCache which is 
also observed by other systems, eg: 
[https://github.com/apache/pinot/pull/7409|https://github.com/apache/pinot/pull/7409*]

!image-2022-07-30-15-47-34-788.png|width=2346,height=996!

So I purpose to use a different cache to replace the current one for better 
performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28531) Shutdown cluster after history server archive finished

2022-07-12 Thread Aitozi (Jira)
Aitozi created FLINK-28531:
--

 Summary: Shutdown cluster after history server archive finished
 Key: FLINK-28531
 URL: https://issues.apache.org/jira/browse/FLINK-28531
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Aitozi


I met a problem that the job cluster may be shutdown with history server 
archive file upload not finished.

After some research, It's may be caused by two reason.

First, the {{HistoryServerArchivist#archiveExecutionGraph}} is not wait to 
complete 
Second, the deregisterApp in the 
{{KubernetesResourceManagerDriver#deregisterApplication}} will directly remove 
the deployment. So in the shutdown flow in ClusterEntrypoint, it will first 
trigger the delete deployment, it will cause the master pod deleted with some 
operation/future can not finished



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28478) Session Cluster will lost if it failed between status recorded and deploy

2022-07-09 Thread Aitozi (Jira)
Aitozi created FLINK-28478:
--

 Summary: Session Cluster will lost if it failed between status 
recorded and deploy
 Key: FLINK-28478
 URL: https://issues.apache.org/jira/browse/FLINK-28478
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Aitozi


I found that the session cluster deploy can not recover if it fails between 
status recorded and deploy. Because, in the next reconcile loop, the spec is 
not detected changed by {{checkNewSpecAlreadyDeployed}}, so it will not try to 
start the session cluster again. 

The application mode have no problem, because the deployed spec SUSPEND state 
of the job is not equal to the desired state, so it will try to reconcile the 
spec change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28477) Support to scan the history server dir with max depth

2022-07-09 Thread Aitozi (Jira)
Aitozi created FLINK-28477:
--

 Summary: Support to scan the history server dir with max depth
 Key: FLINK-28477
 URL: https://issues.apache.org/jira/browse/FLINK-28477
 Project: Flink
  Issue Type: Bug
Reporter: Aitozi


In the {{HistoryServerArchiveFetcher}}, we only list the archives specified by 
the historyserver.archive.fs.dir, but we have an extra hierarchy for the jobs, 
like:

/base/job_platform_id1/jobid1
/base/job_platform_id1/jobid2 
/base/job_platform_id2/jobid3

For this use case, we can not provide the full dir for history server to scan. 
So, we want to set the dir {{/base}} to scan. Can we support the HistoryServer 
to list the dir with max depth, I think it will make it more flexible with 
different usage. 

If no objection, I would like to open a PR for this




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-241: Completed Jobs Information Enhancement

2022-06-16 Thread Aitozi
Thanks Junhan for driving this. It a great improvement for the batch jobs.
I'm looking forward to this feature in our internal use case. +1 for it.

One more question:

Are each attempts of the TaskManager or JobManager pods (if failure occurs)
all be shown in the ui ?

Best,
Aitozi.

Yang Wang  于2022年6月16日周四 19:10写道:

> Thanks Xintong for the explanation.
>
> It makes sense to leave the discussion about job result store in a
> dedicated thread.
>
>
> Best,
> Yang
>
> Xintong Song  于2022年6月16日周四 13:40写道:
>
> > My impression of JobResultStore is more about fault tolerance and high
> > availability. Using it for providing information to users sounds worth
> > exploring. We probably need more time to think it through.
> >
> > Given that it doesn't conflict with what we have proposed in this FLIP,
> I'd
> > suggest considering it as a separate thread and exclude it from the scope
> > of this one.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, Jun 16, 2022 at 11:43 AM Yang Wang 
> wrote:
> >
> > > This is a very useful feature both for finished streaming and batch
> jobs.
> > >
> > > Except for the WebUI & REST API improvements, I am curious whether we
> > could
> > > also integrate some critical information(e.g. latest checkpoint) into
> the
> > > job result store[1].
> > > I am just feeling this is also somehow related with "Completed Jobs
> > > Information Enhancement".
> > > And I think the history server is not necessary for all the scenarios
> > > especially when users only want to check the job execution result.
> > >
> > > [1].
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > Xintong Song  于2022年6月15日周三 15:37写道:
> > >
> > > > Thanks Junhan,
> > > >
> > > > +1 for the proposed improvements.
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Wed, Jun 15, 2022 at 3:16 PM Yangze Guo 
> wrote:
> > > >
> > > > > Thanks for driving this, Junhan.
> > > > >
> > > > > I think it's a valuable usability improvement for both streaming
> and
> > > > > batch users. Looking forward to the community feedback.
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jun 15, 2022 at 3:10 PM junhan yang <
> > yangjunhan1...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I would like to open a discussion on FLIP-241: Completed Jobs
> > > > Information
> > > > > > Enhancement.
> > > > > >
> > > > > > As far as we can tell, streaming and batch users have different
> > > > interests
> > > > > > in probing a job. As Flink grows into a unified streaming & batch
> > > > > processor
> > > > > > and is adopted by more and more batch users, the user experience
> of
> > > > > > completed job's inspection has become more and more important.
> > After
> > > > > doing
> > > > > > several market research, there are several potential improvements
> > > > > spotted.
> > > > > >
> > > > > > The main purpose here is due to the involvement of WebUI & REST
> API
> > > > > > changes, which should be openly discussed and voted on as FLIPs.
> > > > > >
> > > > > > You can find more details in FLIP-241 document[1]. Looking
> forward
> > to
> > > > > > your feedback.
> > > > > >
> > > > > > [1] https://cwiki.apache.org/confluence/x/dRD1D
> > > > > >
> > > > > > Best regards,
> > > > > > Junhan
> > > > >
> > > >
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink PMC Member - Jingsong Lee

2022-06-15 Thread Aitozi
Congrats, Jingsong!

Best,
Aitozi

Zhuoluo Yang  于2022年6月16日周四 09:26写道:

> Many congratulations to teacher Lee!
>
> Thanks,
> Zhuoluo
>
>
> Dian Fu  于2022年6月16日周四 08:54写道:
>
> > Congratulations, Jingsong!
> >
> > Regards,
> > Dian
> >
> > On Thu, Jun 16, 2022 at 1:08 AM Yu Li  wrote:
> >
> > > Congrats, Jingsong!
> > >
> > > Best Regards,
> > > Yu
> > >
> > >
> > > On Wed, 15 Jun 2022 at 15:26, Sergey Nuyanzin 
> > wrote:
> > >
> > > > Congratulations, Jingsong!
> > > >
> > > > On Wed, Jun 15, 2022 at 8:45 AM Jingsong Li 
> > > > wrote:
> > > >
> > > > > Thanks everyone.
> > > > >
> > > > > It's great to be with you in the Flink community!
> > > > >
> > > > > Best,
> > > > > Jingsong
> > > > >
> > > > > On Wed, Jun 15, 2022 at 2:11 PM Yun Gao
>  > >
> > > > > wrote:
> > > > > >
> > > > > > Congratulations, Jingsong!
> > > > > >
> > > > > > Best,
> > > > > > Yun Gao
> > > > > >
> > > > > >
> > > > > >
> --
> > > > > > From:Jing Zhang 
> > > > > > Send Time:2022 Jun. 14 (Tue.) 11:05
> > > > > > To:dev 
> > > > > > Subject:Re: [ANNOUNCE] New Apache Flink PMC Member - Jingsong Lee
> > > > > >
> > > > > > Congratulations, Jingsong!
> > > > > >
> > > > > > Best,
> > > > > > Jing Zhang
> > > > > >
> > > > > > Leonard Xu  于2022年6月14日周二 10:54写道:
> > > > > >
> > > > > > > Congratulations, Jingsong!
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Leonard
> > > > > > >
> > > > > > > > 2022年6月13日 下午6:52,刘首维  写道:
> > > > > > > >
> > > > > > > > Congratulations and well deserved, Jingsong!
> > > > > > > >
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Shouwei
> > > > > > > > -- 原始邮件 --
> > > > > > > > 发件人:
> > > > > > > "dev"
> > > > > > >   <
> > > > > luoyu...@alumni.sjtu.edu.cn
> > > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn>>;
> > > > > > > > 发送时间: 2022年6月13日(星期一) 晚上6:09
> > > > > > > > 收件人: "dev" > > dev@flink.apache.org
> > > > > >>;
> > > > > > > >
> > > > > > > > 主题: Re: [ANNOUNCE] New Apache Flink PMC Member -
> Jingsong
> > > Lee
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Congratulations, Jingsong!
> > > > > > > >
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Yuxia
> > > > > > > >
> > > > > > > > - 原始邮件 -
> > > > > > > > 发件人: "Yun Tang"  > > > > > > > 收件人: "dev"  > > > > > > > 发送时间: 星期一, 2022年 6 月 13日 下午 6:12:24
> > > > > > > > 主题: Re: [ANNOUNCE] New Apache Flink PMC Member - Jingsong Lee
> > > > > > > >
> > > > > > > > Congratulations, Jingsong! Well deserved.
> > > > > > > >
> > > > > > > > Best
> > > > > > > > Yun Tang
> > > > > > > > 
> > > > > > > > From: Xingbo Huang  > > > > > > > Sent: Monday, June 13, 2022 17:39
> > > > > > > > To: dev  > > > > > > > Subject: Re: [ANNOUNCE] New Apache Flink PMC Member -
> Jingsong
> > > Lee
> > > > > > > >
> > > > > > > > Congratulations, Jingsong!
> > >

[jira] [Created] (FLINK-28008) Can not get secondary resource from after operator restart

2022-06-10 Thread Aitozi (Jira)
Aitozi created FLINK-28008:
--

 Summary: Can not get secondary resource from after operator restart
 Key: FLINK-28008
 URL: https://issues.apache.org/jira/browse/FLINK-28008
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Aitozi


how to reproduce:
 * create session and submit session job
 * delete session job
 * restart operator
 * submit session job again

Then it will print log 
{noformat}
2022-06-11 01:51:15,645 o.a.f.k.o.r.s.FlinkSessionJobReconciler [WARN 
][default/basic-session-job-example] Session cluster deployment is not found
2022-06-11 01:51:15,645 o.a.f.k.o.r.s.FlinkSessionJobReconciler [WARN 
][default/basic-session-job-example2] Session cluster deployment is not 
found{noformat}
But the session cluster is still there



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27979) Support to upgrade session cluster in a more fine grained way

2022-06-09 Thread Aitozi (Jira)
Aitozi created FLINK-27979:
--

 Summary: Support to upgrade session cluster in a more fine grained 
way
 Key: FLINK-27979
 URL: https://issues.apache.org/jira/browse/FLINK-27979
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Aitozi


Currently, we upgrade the session cluster by delete the session cluster 
directly, which do not respect to the upgrade mode of the session job. I think 
we could improve it by performing the upgrade in two phase



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27961) The EventUtils generate event name should take the resource's uid into account

2022-06-08 Thread Aitozi (Jira)
Aitozi created FLINK-27961:
--

 Summary: The EventUtils generate event name should take the 
resource's uid into account
 Key: FLINK-27961
 URL: https://issues.apache.org/jira/browse/FLINK-27961
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Reporter: Aitozi


Currently the event name do not include the uid of the target resource. If a 
resource is recreated, it will be associated with the former object's events. 
It's not expected and will be confusing with the empty events when describe the 
resource.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27960) Make the apt-get updating optional

2022-06-08 Thread Aitozi (Jira)
Aitozi created FLINK-27960:
--

 Summary: Make the apt-get updating optional 
 Key: FLINK-27960
 URL: https://issues.apache.org/jira/browse/FLINK-27960
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Aitozi
 Attachments: image-2022-06-08-22-06-44-586.png

I notice that it cost much time to do the apt-get updating, it's not necessary 
during development, So I think it will be convenient to add an option to 
control the image building to skip some unnecessary stage 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27930) Format the timestamp in status to make it explicit

2022-06-07 Thread Aitozi (Jira)
Aitozi created FLINK-27930:
--

 Summary: Format the timestamp in status to make it explicit 
 Key: FLINK-27930
 URL: https://issues.apache.org/jira/browse/FLINK-27930
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Aitozi


Currently, the timestamp in status show the unix timestamp, I think we could 
improve it to a explicit date time 

current: 

 
{code:java}
startTime: "1654584619845"
updateTime: "1654585571858" {code}
expect: 

 

 
{code:java}
startTime: "2022-06-07T06:10:04Z" 
updateTime: "2022-06-07T06:50:04Z"  {code}
It will follow the k8s's object metadata manner

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27925) Avoid to create watcher without the resourceVersion

2022-06-06 Thread Aitozi (Jira)
Aitozi created FLINK-27925:
--

 Summary: Avoid to create watcher without the resourceVersion
 Key: FLINK-27925
 URL: https://issues.apache.org/jira/browse/FLINK-27925
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Reporter: Aitozi


Currently, we create the watcher in KubernetesResourceManager. But it do not 
pass the resourceVersion parameter, it will read from the etcd. It will bring 
the burden to the etcd in large scale cluster (which have been seen in our 
internal k8s cluster). More detail can be found 
[here|https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter]
 

I think we could use the informer to improve it (which will spawn a list-watch 
and maintain the resourceVersion internally)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27921) Introduce the checkResourceRequirementsWithDelay in DeclarativeSlotManager

2022-06-06 Thread Aitozi (Jira)
Aitozi created FLINK-27921:
--

 Summary: Introduce the checkResourceRequirementsWithDelay in 
DeclarativeSlotManager
 Key: FLINK-27921
 URL: https://issues.apache.org/jira/browse/FLINK-27921
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Aitozi


As discussed in 
https://github.com/apache/flink/pull/19840#discussion_r884242067 .This 
 ticket is meant to introduce the same mechanism to wait for a slight delay 
before process the resource check with {{FineGrainedSlotManager}}. It will the 
frequency of unnecessary re-allocations



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.0 released

2022-06-05 Thread Aitozi
Thanks Yang and Nice to see it happen.

Best,
Aitozi.

Yang Wang  于2022年6月5日周日 16:14写道:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Kubernetes Operator 1.0.0.
>
> The Flink Kubernetes Operator allows users to manage their Apache Flink
> applications and their lifecycle through native k8s tooling like kubectl.
> This is the first production ready release and brings numerous
> improvements and new features to almost every aspect of the operator.
>
> Please check out the release blog post for an overview of the release:
>
> https://flink.apache.org/news/2022/06/05/release-kubernetes-operator-1.0.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Flink Kubernetes Operator can be found at:
>
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
>
> Official Docker image for Flink Kubernetes Operator applications can be
> found at:
> https://hub.docker.com/r/apache/flink-kubernetes-operator
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351500
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Regards,
> Gyula & Yang
>


Re: [VOTE] FLIP-235: Hybrid Shuffle Mode

2022-05-28 Thread Aitozi
+1 (non-binding)

Best,
Aitozi

Yangze Guo  于2022年5月27日周五 11:17写道:

> +1
>
> Best,
> Yangze Guo
>
> On Thu, May 26, 2022 at 4:42 PM Xintong Song 
> wrote:
> >
> > +1
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, May 26, 2022 at 3:47 PM weijie guo 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > Thanks for the feedback for FLIP-235: Hybrid Shuffle Mode[1] on the
> > > discussion thread [2]
> > >
> > > I'd like to start a vote for it. The vote will last for at least 72
> hours
> > > unless there is an objection or insufficient votes.
> > >
> > > [1]
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > [2] https://lists.apache.org/thread/hfwpcs54sm5gp3mhv7s3lr79jywo3kv4
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
>


[jira] [Created] (FLINK-27737) Clean the outdated comments and unfencedMainExecutor

2022-05-22 Thread Aitozi (Jira)
Aitozi created FLINK-27737:
--

 Summary: Clean the outdated comments and unfencedMainExecutor
 Key: FLINK-27737
 URL: https://issues.apache.org/jira/browse/FLINK-27737
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Reporter: Aitozi


This ticket is meant to clean the outdated and confusing comments about main 
executor usage eg: 
[link|https://github.com/apache/flink/blob/18a967f8ad7b22c2942e227fb84f08f552660b5a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java#L248]
 and also the unfencedMainThreadExecutor stuff in {{FencedRpcEndpoint}}.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


  1   2   3   >