Re: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-12 Thread Lincoln Lee
+1 (binding)

Best,
Lincoln Lee


Jingsong Li  于2023年6月13日周二 10:07写道:

> +1
>
> On Mon, Jun 12, 2023 at 10:25 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > +1 (binding)
> >
> > Best,
> > Rui Fan
> >
> > On Mon, Jun 12, 2023 at 19:58 liu ron  wrote:
> >
> > > +1 (no-binding)
> > >
> > > Best,
> > > Ron
> > >
> > > Jing Ge  于2023年6月12日周一 19:33写道:
> > >
> > > > +1(binding) Thanks!
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Mon, Jun 12, 2023 at 12:01 PM yuxia 
> > > > wrote:
> > > >
> > > > > +1 (binding)
> > > > > Thanks Mang driving it.
> > > > >
> > > > > Best regards,
> > > > > Yuxia
> > > > >
> > > > > - 原始邮件 -
> > > > > 发件人: "zhangmang1" 
> > > > > 收件人: "dev" 
> > > > > 发送时间: 星期一, 2023年 6 月 12日 下午 5:31:10
> > > > > 主题: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS
> SELECT(CTAS)
> > > > > statement
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > > Thanks for all the feedback about FLIP-305: Support atomic for
> CREATE
> > > > > TABLE AS SELECT(CTAS) statement[1].
> > > > > [2] is the discussion thread.
> > > > >
> > > > > I'd like to start a vote for it. The vote will be open for at
> least 72
> > > > > hours (until June 15th, 10:00AM GMT) unless there is an objection
> or an
> > > > > insufficient number of votes.[1]
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> > > > > [2]
> https://lists.apache.org/thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best regards,
> > > > > Mang Zhang
> > > > >
> > > >
> > >
>


Re: [VOTE] FLIP-311: Support Call Stored Procedure

2023-06-12 Thread Lincoln Lee
+1 (binding)

Best,
Lincoln Lee


Jingsong Li  于2023年6月13日周二 10:07写道:

> +1
>
> On Mon, Jun 12, 2023 at 10:32 PM Rui Fan <1996fan...@gmail.com> wrote:
> >
> > +1 (binding)
> >
> > Best,
> > Rui Fan
> >
> > On Mon, Jun 12, 2023 at 22:20 Benchao Li  wrote:
> >
> > > +1 (binding)
> > >
> > > yuxia  于2023年6月12日周一 17:58写道:
> > >
> > > > Hi everyone,
> > > > Thanks for all the feedback about FLIP-311: Support Call Stored
> > > > Procedure[1]. Based on the discussion [2], we have come to a
> consensus,
> > > so
> > > > I would like to start a vote.
> > > > The vote will be open for at least 72 hours (until June 15th, 10:00AM
> > > GMT)
> > > > unless there is an objection or an insufficient number of votes.
> > > >
> > > >
> > > > [1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> > > > [2] https://lists.apache.org/thread/k6s50gcgznon9v1oylyh396gb5kgrwmd
> > > >
> > > > Best regards,
> > > > Yuxia
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
>


[jira] [Created] (FLINK-32321) Temporal Join job missing condition after “ON”

2023-06-12 Thread macdoor615 (Jira)
macdoor615 created FLINK-32321:
--

 Summary: Temporal Join job missing condition after “ON”
 Key: FLINK-32321
 URL: https://issues.apache.org/jira/browse/FLINK-32321
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Gateway
Affects Versions: 1.17.1
Reporter: macdoor615
 Fix For: 1.17.2


We have a SQL job, like this
{code:java}
select ... from prod_kafka.f_alarm_tag_dev
 /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */ as f 
left join mysql_bnpmp.gem_bnpmp.f_a_alarm_filter
 /*+ OPTIONS('lookup.cache.max-rows' = '5000',
 'lookup.cache.ttl' = '30s') */
FOR SYSTEM_TIME AS OF f.proctime ff  on ff.rule_type = 0 and f.ne_ip = ff.ip 
{code}
We submit to flink 1.17.1 cluster with sql-gateway. We found job detail missing 
lookup condition (rule_type=0) 
{code:java}
  +- [1196]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter], 
joinType=[LeftOuterJoin], lookup=[ip=ne_ip], select=[event_id, {code}
We submit same sql to flink 1.17.0 cluster with sql-gateway. There is 
(rule_type=0) lookup condition
{code:java}
  +- [3]:LookupJoin(table=[mysql_bnpmp.gem_bnpmp.f_a_alarm_filter], 
joinType=[LeftOuterJoin], lookup=[rule_type=0, ip=ne_ip], where=[(rule_type = 
0)], select=[event_id, severity,{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Aitozi
Get your meaning now, thanks :)

Best,
Aitozi.

Feng Jin  于2023年6月13日周二 11:16写道:

> Hi Aitozi,
>
> Sorry for the confusing description.
>
> What I meant was that if we need to remind users about tire safety issues,
> we should introduce the new UDTF interface instead of executing the
> original UDTF asynchronously. Therefore, I agree with introducing the
> AsyncTableFunction.
>
> Best,
> Feng
>
> On Tue, Jun 13, 2023 at 10:42 AM Aitozi  wrote:
>
> > Hi Feng,
> > Thanks for your question. We do not provide a way to switch the UDTF
> > between sync and async way,
> > So there should be no thread safety problem here.
> >
> > Best,
> > Aitozi
> >
> > Feng Jin  于2023年6月13日周二 10:31写道:
> >
> > > Hi Aitozi, We do need to remind users about thread safety issues. Thank
> > you
> > > for your efforts on this FLIP. I have no further questions.
> > > Best, Feng
> > >
> > >
> > > On Tue, Jun 13, 2023 at 6:05 AM Jing Ge 
> > > wrote:
> > >
> > > > Hi Aitozi,
> > > >
> > > > Thanks for taking care of that part. I have no other concern.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > >
> > > > On Mon, Jun 12, 2023 at 5:38 PM Aitozi  wrote:
> > > >
> > > > > BTW, If there are no other more blocking issue / comments, I would
> > like
> > > > to
> > > > > start a VOTE in another thread this wednesday 6.14
> > > > >
> > > > > Thanks,
> > > > > Aitozi.
> > > > >
> > > > > Aitozi  于2023年6月12日周一 23:34写道:
> > > > >
> > > > > > Hi, Jing,
> > > > > > Thanks for your explanation. I get your point now.
> > > > > >
> > > > > > For the performance part, I think it's a good idea to run with
> > > > returning
> > > > > a
> > > > > > big table case, the memory consumption
> > > > > > should be a point to be taken care about. Because in the ordered
> > > mode,
> > > > > the
> > > > > > head element in buffer may affect the
> > > > > > total memory consumption.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Aitozi.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Jing Ge  于2023年6月12日周一 20:28写道:
> > > > > >
> > > > > >> Hi Aitozi,
> > > > > >>
> > > > > >> Which key will be used for lookup is not an issue, only one row
> > will
> > > > be
> > > > > >> required for each key in order to enrich it. True, it depends on
> > the
> > > > > >> implementation whether multiple rows or single row for each key
> > will
> > > > be
> > > > > >> returned. However, for the lookup & enrichment scenario, one
> > row/key
> > > > is
> > > > > >> recommended, otherwise, like I mentioned previously, enrichment
> > > won't
> > > > > >> work.
> > > > > >>
> > > > > >> I am a little bit concerned about returning a big table for each
> > > key,
> > > > > >> since
> > > > > >> it will take the async call longer to return and need more
> memory.
> > > The
> > > > > >> performance tests should cover this scenario. This is not a
> > blocking
> > > > > issue
> > > > > >> for this FLIP.
> > > > > >>
> > > > > >> Best regards,
> > > > > >> Jing
> > > > > >>
> > > > > >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi 
> > > wrote:
> > > > > >>
> > > > > >> > Hi Jing,
> > > > > >> > I means the join key is not necessary to be the primary
> key
> > or
> > > > > >> unique
> > > > > >> > index of the database.
> > > > > >> > In this situation, we may queried out multi rows for one join
> > > key. I
> > > > > >> think
> > > > > >> > that's why the
> > > > > >> > LookupFunction#lookup will return a collection of RowData.
> > > > > >> >
> > > > > >> > BTW, I think the behavior of lookup join will not affect the
> > > > semantic
> > > > > of
> > > > > >> > the async udtf.
> > > > > >> > We use the Async TableFunction here and the table function can
> > > > collect
> > > > > >> > multiple rows.
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> > Atiozi.
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > Jing Ge  于2023年6月10日周六 00:15写道:
> > > > > >> >
> > > > > >> > > Hi Aitozi,
> > > > > >> > >
> > > > > >> > > The keyRow used in this case contains all keys[1].
> > > > > >> > >
> > > > > >> > > Best regards,
> > > > > >> > > Jing
> > > > > >> > >
> > > > > >> > > [1]
> > > > > >> > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi  >
> > > > wrote:
> > > > > >> > >
> > > > > >> > > > Hi Jing,
> > > > > >> > > >
> > > > > >> > > >  The performance test is added to the FLIP.
> > > > > >> > > >
> > > > > >> > > >  As I know, The lookup join can return multi rows, it
> > > > depends
> > > > > on
> > > > > >> > > > whether  the join key
> > > > > >> > > > is the primary key of the external database or not. The
> > > `lookup`
> > > > > [1]
> > > > > >> > will
> > > > > >> > > > return a collection of
> > > > > >> > > > joined result, and each of them will be 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Feng Jin
Hi Aitozi,

Sorry for the confusing description.

What I meant was that if we need to remind users about tire safety issues,
we should introduce the new UDTF interface instead of executing the
original UDTF asynchronously. Therefore, I agree with introducing the
AsyncTableFunction.

Best,
Feng

On Tue, Jun 13, 2023 at 10:42 AM Aitozi  wrote:

> Hi Feng,
> Thanks for your question. We do not provide a way to switch the UDTF
> between sync and async way,
> So there should be no thread safety problem here.
>
> Best,
> Aitozi
>
> Feng Jin  于2023年6月13日周二 10:31写道:
>
> > Hi Aitozi, We do need to remind users about thread safety issues. Thank
> you
> > for your efforts on this FLIP. I have no further questions.
> > Best, Feng
> >
> >
> > On Tue, Jun 13, 2023 at 6:05 AM Jing Ge 
> > wrote:
> >
> > > Hi Aitozi,
> > >
> > > Thanks for taking care of that part. I have no other concern.
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > > On Mon, Jun 12, 2023 at 5:38 PM Aitozi  wrote:
> > >
> > > > BTW, If there are no other more blocking issue / comments, I would
> like
> > > to
> > > > start a VOTE in another thread this wednesday 6.14
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > > Aitozi  于2023年6月12日周一 23:34写道:
> > > >
> > > > > Hi, Jing,
> > > > > Thanks for your explanation. I get your point now.
> > > > >
> > > > > For the performance part, I think it's a good idea to run with
> > > returning
> > > > a
> > > > > big table case, the memory consumption
> > > > > should be a point to be taken care about. Because in the ordered
> > mode,
> > > > the
> > > > > head element in buffer may affect the
> > > > > total memory consumption.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Aitozi.
> > > > >
> > > > >
> > > > >
> > > > > Jing Ge  于2023年6月12日周一 20:28写道:
> > > > >
> > > > >> Hi Aitozi,
> > > > >>
> > > > >> Which key will be used for lookup is not an issue, only one row
> will
> > > be
> > > > >> required for each key in order to enrich it. True, it depends on
> the
> > > > >> implementation whether multiple rows or single row for each key
> will
> > > be
> > > > >> returned. However, for the lookup & enrichment scenario, one
> row/key
> > > is
> > > > >> recommended, otherwise, like I mentioned previously, enrichment
> > won't
> > > > >> work.
> > > > >>
> > > > >> I am a little bit concerned about returning a big table for each
> > key,
> > > > >> since
> > > > >> it will take the async call longer to return and need more memory.
> > The
> > > > >> performance tests should cover this scenario. This is not a
> blocking
> > > > issue
> > > > >> for this FLIP.
> > > > >>
> > > > >> Best regards,
> > > > >> Jing
> > > > >>
> > > > >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi 
> > wrote:
> > > > >>
> > > > >> > Hi Jing,
> > > > >> > I means the join key is not necessary to be the primary key
> or
> > > > >> unique
> > > > >> > index of the database.
> > > > >> > In this situation, we may queried out multi rows for one join
> > key. I
> > > > >> think
> > > > >> > that's why the
> > > > >> > LookupFunction#lookup will return a collection of RowData.
> > > > >> >
> > > > >> > BTW, I think the behavior of lookup join will not affect the
> > > semantic
> > > > of
> > > > >> > the async udtf.
> > > > >> > We use the Async TableFunction here and the table function can
> > > collect
> > > > >> > multiple rows.
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Atiozi.
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > Jing Ge  于2023年6月10日周六 00:15写道:
> > > > >> >
> > > > >> > > Hi Aitozi,
> > > > >> > >
> > > > >> > > The keyRow used in this case contains all keys[1].
> > > > >> > >
> > > > >> > > Best regards,
> > > > >> > > Jing
> > > > >> > >
> > > > >> > > [1]
> > > > >> > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > > >> > >
> > > > >> > >
> > > > >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi 
> > > wrote:
> > > > >> > >
> > > > >> > > > Hi Jing,
> > > > >> > > >
> > > > >> > > >  The performance test is added to the FLIP.
> > > > >> > > >
> > > > >> > > >  As I know, The lookup join can return multi rows, it
> > > depends
> > > > on
> > > > >> > > > whether  the join key
> > > > >> > > > is the primary key of the external database or not. The
> > `lookup`
> > > > [1]
> > > > >> > will
> > > > >> > > > return a collection of
> > > > >> > > > joined result, and each of them will be collected
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > [1]:
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Aitozi.
> 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Aitozi
Hi Feng,
Thanks for your question. We do not provide a way to switch the UDTF
between sync and async way,
So there should be no thread safety problem here.

Best,
Aitozi

Feng Jin  于2023年6月13日周二 10:31写道:

> Hi Aitozi, We do need to remind users about thread safety issues. Thank you
> for your efforts on this FLIP. I have no further questions.
> Best, Feng
>
>
> On Tue, Jun 13, 2023 at 6:05 AM Jing Ge 
> wrote:
>
> > Hi Aitozi,
> >
> > Thanks for taking care of that part. I have no other concern.
> >
> > Best regards,
> > Jing
> >
> >
> > On Mon, Jun 12, 2023 at 5:38 PM Aitozi  wrote:
> >
> > > BTW, If there are no other more blocking issue / comments, I would like
> > to
> > > start a VOTE in another thread this wednesday 6.14
> > >
> > > Thanks,
> > > Aitozi.
> > >
> > > Aitozi  于2023年6月12日周一 23:34写道:
> > >
> > > > Hi, Jing,
> > > > Thanks for your explanation. I get your point now.
> > > >
> > > > For the performance part, I think it's a good idea to run with
> > returning
> > > a
> > > > big table case, the memory consumption
> > > > should be a point to be taken care about. Because in the ordered
> mode,
> > > the
> > > > head element in buffer may affect the
> > > > total memory consumption.
> > > >
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > >
> > > >
> > > > Jing Ge  于2023年6月12日周一 20:28写道:
> > > >
> > > >> Hi Aitozi,
> > > >>
> > > >> Which key will be used for lookup is not an issue, only one row will
> > be
> > > >> required for each key in order to enrich it. True, it depends on the
> > > >> implementation whether multiple rows or single row for each key will
> > be
> > > >> returned. However, for the lookup & enrichment scenario, one row/key
> > is
> > > >> recommended, otherwise, like I mentioned previously, enrichment
> won't
> > > >> work.
> > > >>
> > > >> I am a little bit concerned about returning a big table for each
> key,
> > > >> since
> > > >> it will take the async call longer to return and need more memory.
> The
> > > >> performance tests should cover this scenario. This is not a blocking
> > > issue
> > > >> for this FLIP.
> > > >>
> > > >> Best regards,
> > > >> Jing
> > > >>
> > > >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi 
> wrote:
> > > >>
> > > >> > Hi Jing,
> > > >> > I means the join key is not necessary to be the primary key or
> > > >> unique
> > > >> > index of the database.
> > > >> > In this situation, we may queried out multi rows for one join
> key. I
> > > >> think
> > > >> > that's why the
> > > >> > LookupFunction#lookup will return a collection of RowData.
> > > >> >
> > > >> > BTW, I think the behavior of lookup join will not affect the
> > semantic
> > > of
> > > >> > the async udtf.
> > > >> > We use the Async TableFunction here and the table function can
> > collect
> > > >> > multiple rows.
> > > >> >
> > > >> > Thanks,
> > > >> > Atiozi.
> > > >> >
> > > >> >
> > > >> >
> > > >> > Jing Ge  于2023年6月10日周六 00:15写道:
> > > >> >
> > > >> > > Hi Aitozi,
> > > >> > >
> > > >> > > The keyRow used in this case contains all keys[1].
> > > >> > >
> > > >> > > Best regards,
> > > >> > > Jing
> > > >> > >
> > > >> > > [1]
> > > >> > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > >> > >
> > > >> > >
> > > >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi 
> > wrote:
> > > >> > >
> > > >> > > > Hi Jing,
> > > >> > > >
> > > >> > > >  The performance test is added to the FLIP.
> > > >> > > >
> > > >> > > >  As I know, The lookup join can return multi rows, it
> > depends
> > > on
> > > >> > > > whether  the join key
> > > >> > > > is the primary key of the external database or not. The
> `lookup`
> > > [1]
> > > >> > will
> > > >> > > > return a collection of
> > > >> > > > joined result, and each of them will be collected
> > > >> > > >
> > > >> > > >
> > > >> > > > [1]:
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> > > >> > > >
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Aitozi.
> > > >> > > >
> > > >> > > > Jing Ge  于2023年6月9日周五 17:05写道:
> > > >> > > >
> > > >> > > > > Hi Aitozi,
> > > >> > > > >
> > > >> > > > > Thanks for the feedback. Looking forward to the performance
> > > tests.
> > > >> > > > >
> > > >> > > > > Afaik, lookup returns one row for each key [1] [2].
> > > Conceptually,
> > > >> the
> > > >> > > > > lookup function is used to enrich column(s) from the
> dimension
> > > >> table.
> > > >> > > If,
> > > >> > > > > for the given key, there will be more than one row, there
> will
> > > be
> > > >> no
> > > >> > > way
> > > >> > > > to
> > > >> > > > > know which row will be used to enrich the key.
> > > >> > > > >
> > > >> > > > > 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Feng Jin
Hi Aitozi, We do need to remind users about thread safety issues. Thank you
for your efforts on this FLIP. I have no further questions.
Best, Feng


On Tue, Jun 13, 2023 at 6:05 AM Jing Ge  wrote:

> Hi Aitozi,
>
> Thanks for taking care of that part. I have no other concern.
>
> Best regards,
> Jing
>
>
> On Mon, Jun 12, 2023 at 5:38 PM Aitozi  wrote:
>
> > BTW, If there are no other more blocking issue / comments, I would like
> to
> > start a VOTE in another thread this wednesday 6.14
> >
> > Thanks,
> > Aitozi.
> >
> > Aitozi  于2023年6月12日周一 23:34写道:
> >
> > > Hi, Jing,
> > > Thanks for your explanation. I get your point now.
> > >
> > > For the performance part, I think it's a good idea to run with
> returning
> > a
> > > big table case, the memory consumption
> > > should be a point to be taken care about. Because in the ordered mode,
> > the
> > > head element in buffer may affect the
> > > total memory consumption.
> > >
> > >
> > > Thanks,
> > > Aitozi.
> > >
> > >
> > >
> > > Jing Ge  于2023年6月12日周一 20:28写道:
> > >
> > >> Hi Aitozi,
> > >>
> > >> Which key will be used for lookup is not an issue, only one row will
> be
> > >> required for each key in order to enrich it. True, it depends on the
> > >> implementation whether multiple rows or single row for each key will
> be
> > >> returned. However, for the lookup & enrichment scenario, one row/key
> is
> > >> recommended, otherwise, like I mentioned previously, enrichment won't
> > >> work.
> > >>
> > >> I am a little bit concerned about returning a big table for each key,
> > >> since
> > >> it will take the async call longer to return and need more memory. The
> > >> performance tests should cover this scenario. This is not a blocking
> > issue
> > >> for this FLIP.
> > >>
> > >> Best regards,
> > >> Jing
> > >>
> > >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi  wrote:
> > >>
> > >> > Hi Jing,
> > >> > I means the join key is not necessary to be the primary key or
> > >> unique
> > >> > index of the database.
> > >> > In this situation, we may queried out multi rows for one join key. I
> > >> think
> > >> > that's why the
> > >> > LookupFunction#lookup will return a collection of RowData.
> > >> >
> > >> > BTW, I think the behavior of lookup join will not affect the
> semantic
> > of
> > >> > the async udtf.
> > >> > We use the Async TableFunction here and the table function can
> collect
> > >> > multiple rows.
> > >> >
> > >> > Thanks,
> > >> > Atiozi.
> > >> >
> > >> >
> > >> >
> > >> > Jing Ge  于2023年6月10日周六 00:15写道:
> > >> >
> > >> > > Hi Aitozi,
> > >> > >
> > >> > > The keyRow used in this case contains all keys[1].
> > >> > >
> > >> > > Best regards,
> > >> > > Jing
> > >> > >
> > >> > > [1]
> > >> > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > >> > >
> > >> > >
> > >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi 
> wrote:
> > >> > >
> > >> > > > Hi Jing,
> > >> > > >
> > >> > > >  The performance test is added to the FLIP.
> > >> > > >
> > >> > > >  As I know, The lookup join can return multi rows, it
> depends
> > on
> > >> > > > whether  the join key
> > >> > > > is the primary key of the external database or not. The `lookup`
> > [1]
> > >> > will
> > >> > > > return a collection of
> > >> > > > joined result, and each of them will be collected
> > >> > > >
> > >> > > >
> > >> > > > [1]:
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> > >> > > >
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Aitozi.
> > >> > > >
> > >> > > > Jing Ge  于2023年6月9日周五 17:05写道:
> > >> > > >
> > >> > > > > Hi Aitozi,
> > >> > > > >
> > >> > > > > Thanks for the feedback. Looking forward to the performance
> > tests.
> > >> > > > >
> > >> > > > > Afaik, lookup returns one row for each key [1] [2].
> > Conceptually,
> > >> the
> > >> > > > > lookup function is used to enrich column(s) from the dimension
> > >> table.
> > >> > > If,
> > >> > > > > for the given key, there will be more than one row, there will
> > be
> > >> no
> > >> > > way
> > >> > > > to
> > >> > > > > know which row will be used to enrich the key.
> > >> > > > >
> > >> > > > > [1]
> > >> > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > >> > > > > [2]
> > >> > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> > >> > > > >
> > >> 

Re: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

2023-06-12 Thread yuxia
Hi, Etienne.
Thanks for driving it.
I have one question about the mechanism of the cooldown timeout.

>From the Proposed Changes part, if a scalling event is received and it falls 
>during the cooldown period, it'll be stacked to be executed after the period 
>ends.
Also, from the description of FLINK-21883[1], cooldown timeout is to avoid 
rescaling the job very frequently, because TaskManagers are not all connecting 
at the same time.

So, is it possible that every taskmanager connecting will produce a scalling 
event and it'll be stacked with many scale up event which causes it'll take a 
long time to finish all?
Can we just take the last one event?

[1]: https://issues.apache.org/jira/browse/FLINK-21883

Best regards,
Yuxia

- 原始邮件 -
发件人: "Etienne Chauchot" 
收件人: "dev" , "Robert Metzger" 
发送时间: 星期一, 2023年 6 月 12日 下午 11:34:25
主题: [DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

Hi,

I’d like to start a discussion about FLIP-322 [1] which introduces a 
cooldown period for the adaptive scheduler.

I'd like to get your feedback especially @Robert as you opened the 
related ticket and worked on the reactive mode a lot.

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler

Best

Etienne


[jira] [Created] (FLINK-32320) Same correlate can not be reused due to the different correlationId

2023-06-12 Thread Aitozi (Jira)
Aitozi created FLINK-32320:
--

 Summary: Same correlate can not be reused due to the different 
correlationId
 Key: FLINK-32320
 URL: https://issues.apache.org/jira/browse/FLINK-32320
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Aitozi


As describe in SubplanReuserTest


{code:java}
  @Test
  def testSubplanReuseOnCorrelate(): Unit = {
util.addFunction("str_split", new StringSplit())
val sqlQuery =
  """
|WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) 
AS T(v))
|SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
  """.stripMargin
// TODO the sub-plan of Correlate should be reused,
// however the digests of Correlates are different
util.verifyExecPlan(sqlQuery)
  }
{code}

This will produce the plan 


{code:java}
HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0, 
c0, f00], build=[right])
:- Exchange(distribution=[hash[f0]])
:  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f0]])
   +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER])
  +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}

The Correlate node can not be reused due to the different correlation id.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-12 Thread Jingsong Li
+1

On Mon, Jun 12, 2023 at 10:25 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> +1 (binding)
>
> Best,
> Rui Fan
>
> On Mon, Jun 12, 2023 at 19:58 liu ron  wrote:
>
> > +1 (no-binding)
> >
> > Best,
> > Ron
> >
> > Jing Ge  于2023年6月12日周一 19:33写道:
> >
> > > +1(binding) Thanks!
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Mon, Jun 12, 2023 at 12:01 PM yuxia 
> > > wrote:
> > >
> > > > +1 (binding)
> > > > Thanks Mang driving it.
> > > >
> > > > Best regards,
> > > > Yuxia
> > > >
> > > > - 原始邮件 -
> > > > 发件人: "zhangmang1" 
> > > > 收件人: "dev" 
> > > > 发送时间: 星期一, 2023年 6 月 12日 下午 5:31:10
> > > > 主题: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS)
> > > > statement
> > > >
> > > > Hi everyone,
> > > >
> > > > Thanks for all the feedback about FLIP-305: Support atomic for CREATE
> > > > TABLE AS SELECT(CTAS) statement[1].
> > > > [2] is the discussion thread.
> > > >
> > > > I'd like to start a vote for it. The vote will be open for at least 72
> > > > hours (until June 15th, 10:00AM GMT) unless there is an objection or an
> > > > insufficient number of votes.[1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> > > > [2]https://lists.apache.org/thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best regards,
> > > > Mang Zhang
> > > >
> > >
> >


Re: [VOTE] FLIP-311: Support Call Stored Procedure

2023-06-12 Thread Jingsong Li
+1

On Mon, Jun 12, 2023 at 10:32 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> +1 (binding)
>
> Best,
> Rui Fan
>
> On Mon, Jun 12, 2023 at 22:20 Benchao Li  wrote:
>
> > +1 (binding)
> >
> > yuxia  于2023年6月12日周一 17:58写道:
> >
> > > Hi everyone,
> > > Thanks for all the feedback about FLIP-311: Support Call Stored
> > > Procedure[1]. Based on the discussion [2], we have come to a consensus,
> > so
> > > I would like to start a vote.
> > > The vote will be open for at least 72 hours (until June 15th, 10:00AM
> > GMT)
> > > unless there is an objection or an insufficient number of votes.
> > >
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> > > [2] https://lists.apache.org/thread/k6s50gcgznon9v1oylyh396gb5kgrwmd
> > >
> > > Best regards,
> > > Yuxia
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >


Re: [DISCUSS] FLIP 295: Support persistence of Catalog configuration and asynchronous registration

2023-06-12 Thread Feng Jin
Hi everyone,

Thank you everyone for your valuable input. If there are no further
questions or concerns about the FLIP-295[1], I would like to start voting
tomorrow (6/14).

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations


Best,
Feng


On Mon, Jun 12, 2023 at 10:54 AM Feng Jin  wrote:

> hi, Hang
>
> Thanks for your reply,  Make sense to me, I updated the FLIP.
>
>
> Best,
>
> Feng
>
>
>
> On Fri, Jun 9, 2023 at 3:34 PM Hang Ruan  wrote:
>
>> Hi, Feng.
>>
>> I reviewed this FLIP again and found some little places that may need to
>> optimize.
>>
>> 1. `CatalogDescriptor` may need a private constructor.
>> 2. The method `initCatalog` in `CatalogManager` is not introduced.
>>
>> Best,
>> Hang
>>
>> Feng Jin  于2023年6月6日周二 21:17写道:
>>
>> > Hi Leonard,
>> >
>> > Thanks for your reply.
>> >
>> > > 1. a  How to construct a CatalogDescriptor ?
>> >
>> > I think it would be helpful to add a method for constructing a
>> > CatalogDescriptor, as you mentioned in 1.c. I will update the
>> documentation
>> > later.
>> >
>> > > 1.b  How to visit the fields ? Could we use Configuration instead of
>> > Map ?
>> >
>> > I believe that the use of Map options is only intended
>> for
>> > creating a catalog and not for accessing internal parameters.
>> >
>> > Since all of the relevant parameters for CREATE CATALOG are also stored
>> in
>> > Map options, my understanding is that using Map> > String> options should suffice.
>> >
>> > Here is the implementation of execute CREATE CATALOG statement.
>> > ```java
>> > private TableResultInternal createCatalog(CreateCatalogOperation
>> operation)
>> > {
>> > String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
>> > try {
>> > String catalogName = operation.getCatalogName();
>> > Map properties = operation.getProperties();
>> >
>> > Catalog catalog =
>> > FactoryUtil.createCatalog(
>> > catalogName,
>> > properties,
>> > tableConfig,
>> > resourceManager.getUserClassLoader());
>> > catalogManager.registerCatalog(catalogName, catalog);
>> >
>> > return TableResultImpl.TABLE_RESULT_OK;
>> > } catch (CatalogException e) {
>> > throw new ValidationException(exMsg, e);
>> > }
>> > }
>> > ```
>> >
>> >
>> > >  2. Do we have plan to offer a default CatalogStore if user didn’t
>> config
>> > this?
>> >
>> > Yes, the in-memory catalogStore will be used as the default CatalogStore
>> > even if the user has not configured one
>> >
>> >
>> > Best,
>> > Feng
>> >
>> >
>> > On Tue, Jun 6, 2023 at 8:02 PM Leonard Xu  wrote:
>> >
>> > > Hi, Feng
>> > >
>> > > Sorry for reply late, but I’ve some comments about the FLIP
>> > >
>> > >
>> > > 1. The introduced Public class CatalogDescriptor seems missed some
>> > > necessary component
>> > >   a) How to construct a CatalogDescriptor ?
>> > >   b) How to visit the fields ? Could we use Configuration instead of
>> > > Map ?
>> > >   c) Could we offer a built-in factory method to build a
>> > CatalogDescriptor
>> > > like
>> > >  public static CatalogDescriptor of(String catalogName,
>> Configuration
>> > > configuration)
>> > >
>> > > 2. The FLIP said “By default, there are two built-in CatalogStores
>> > > available: the In-Memory CatalogStore and the File CatalogStore” ,
>> > > Do we have plan to offer a default CatalogStore if user didn’t config
>> > > this? IIUC, users can obtains the benefits  from lazy catalog
>> > > initialization If
>> > > we have a default catalogstore even it is in-memory.
>> > >
>> > > Best,
>> > > Leonard
>> > >
>> > >
>> > >
>> > > > On Jun 6, 2023, at 7:08 PM, Feng Jin  wrote:
>> > > >
>> > > > Hi everyone,
>> > > >
>> > > > Thank you everyone for your valuable input. If there are no further
>> > > > questions or concerns about the FLIP[1], I would like to start
>> voting
>> > > > tomorrow (6/7).
>> > > >
>> > > >
>> > > > [1]
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations
>> > > >
>> > > >
>> > > > Best,
>> > > > Feng
>> > > >
>> > > >
>> > > > On Sun, Jun 4, 2023 at 3:33 PM Feng Jin 
>> wrote:
>> > > >
>> > > >> Hi Samrat,
>> > > >>
>> > > >> Thanks for your advice.
>> > > >>
>> > > >>> 1. The createCatalog method does not mention any exceptions being
>> > > >> thrown.
>> > > >>
>> > > >> CreateCatalog will throw CatalogException like registerCatalog.  As
>> > > >> CatalogException is a RuntimeException,
>> > > >> there is no explicit declaration of throwing Exceptions in
>> > > CatalogManager
>> > > >> and TableEnvironment.
>> > > >> To avoid misunderstandings, I have added the "throw
>> CatalogException"
>> > > flag
>> > > >> to the createCatalog method definition of CatalogStore.
>> > > >>
>> > > >>> 2. Could you please provide an exhaustive list of the supported
>> > kinds?
>> > > >>
>> > > >> Sure,  the documentation now includes the configuration of the
>> > built-in
>> > > >> 

[jira] [Created] (FLINK-32319) flink can't the partition of network after restart

2023-06-12 Thread wgcn (Jira)
wgcn created FLINK-32319:


 Summary: flink can't the partition of network after restart
 Key: FLINK-32319
 URL: https://issues.apache.org/jira/browse/FLINK-32319
 Project: Flink
  Issue Type: Bug
Reporter: wgcn






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Jing Ge
Hi Aitozi,

Thanks for taking care of that part. I have no other concern.

Best regards,
Jing


On Mon, Jun 12, 2023 at 5:38 PM Aitozi  wrote:

> BTW, If there are no other more blocking issue / comments, I would like to
> start a VOTE in another thread this wednesday 6.14
>
> Thanks,
> Aitozi.
>
> Aitozi  于2023年6月12日周一 23:34写道:
>
> > Hi, Jing,
> > Thanks for your explanation. I get your point now.
> >
> > For the performance part, I think it's a good idea to run with returning
> a
> > big table case, the memory consumption
> > should be a point to be taken care about. Because in the ordered mode,
> the
> > head element in buffer may affect the
> > total memory consumption.
> >
> >
> > Thanks,
> > Aitozi.
> >
> >
> >
> > Jing Ge  于2023年6月12日周一 20:28写道:
> >
> >> Hi Aitozi,
> >>
> >> Which key will be used for lookup is not an issue, only one row will be
> >> required for each key in order to enrich it. True, it depends on the
> >> implementation whether multiple rows or single row for each key will be
> >> returned. However, for the lookup & enrichment scenario, one row/key is
> >> recommended, otherwise, like I mentioned previously, enrichment won't
> >> work.
> >>
> >> I am a little bit concerned about returning a big table for each key,
> >> since
> >> it will take the async call longer to return and need more memory. The
> >> performance tests should cover this scenario. This is not a blocking
> issue
> >> for this FLIP.
> >>
> >> Best regards,
> >> Jing
> >>
> >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi  wrote:
> >>
> >> > Hi Jing,
> >> > I means the join key is not necessary to be the primary key or
> >> unique
> >> > index of the database.
> >> > In this situation, we may queried out multi rows for one join key. I
> >> think
> >> > that's why the
> >> > LookupFunction#lookup will return a collection of RowData.
> >> >
> >> > BTW, I think the behavior of lookup join will not affect the semantic
> of
> >> > the async udtf.
> >> > We use the Async TableFunction here and the table function can collect
> >> > multiple rows.
> >> >
> >> > Thanks,
> >> > Atiozi.
> >> >
> >> >
> >> >
> >> > Jing Ge  于2023年6月10日周六 00:15写道:
> >> >
> >> > > Hi Aitozi,
> >> > >
> >> > > The keyRow used in this case contains all keys[1].
> >> > >
> >> > > Best regards,
> >> > > Jing
> >> > >
> >> > > [1]
> >> > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> >> > >
> >> > >
> >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:
> >> > >
> >> > > > Hi Jing,
> >> > > >
> >> > > >  The performance test is added to the FLIP.
> >> > > >
> >> > > >  As I know, The lookup join can return multi rows, it depends
> on
> >> > > > whether  the join key
> >> > > > is the primary key of the external database or not. The `lookup`
> [1]
> >> > will
> >> > > > return a collection of
> >> > > > joined result, and each of them will be collected
> >> > > >
> >> > > >
> >> > > > [1]:
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> >> > > >
> >> > > >
> >> > > > Thanks,
> >> > > > Aitozi.
> >> > > >
> >> > > > Jing Ge  于2023年6月9日周五 17:05写道:
> >> > > >
> >> > > > > Hi Aitozi,
> >> > > > >
> >> > > > > Thanks for the feedback. Looking forward to the performance
> tests.
> >> > > > >
> >> > > > > Afaik, lookup returns one row for each key [1] [2].
> Conceptually,
> >> the
> >> > > > > lookup function is used to enrich column(s) from the dimension
> >> table.
> >> > > If,
> >> > > > > for the given key, there will be more than one row, there will
> be
> >> no
> >> > > way
> >> > > > to
> >> > > > > know which row will be used to enrich the key.
> >> > > > >
> >> > > > > [1]
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> >> > > > > [2]
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> >> > > > >
> >> > > > > Best regards,
> >> > > > > Jing
> >> > > > >
> >> > > > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi 
> >> wrote:
> >> > > > >
> >> > > > > > Hi Jing
> >> > > > > > Thanks for your good questions. I have updated the example
> >> to
> >> > the
> >> > > > > FLIP.
> >> > > > > >
> >> > > > > > > Only one row for each lookup
> >> > > > > > lookup can also return multi rows, based on the query result.
> >> [1]
> >> > > > > >
> >> > > > > > [1]:
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> 

Re: [DISCUSS] FLIP-246: Multi Cluster Kafka Source

2023-06-12 Thread Mason Chen
>
> My main worry for doing this as a later iteration is that this would
> probably be a breaking change for the public interface. If that can be
> avoided and planned ahead, I'm fine with moving forward with how it is
> right now.


Make sense. Considering the public interfaces, I think we still want to
provide clients the ability to pin certain configurations in the
builder--however, cluster specific configurations may not be known upfront
or generalize to all clusters so there would need to be changes in the
`KafkaMetadataService` interface. This could be achieved by exposing via:

1. A separate API (e.g. `Map
getKafkaClusterProperties()`) in KafkaMetadataService
2. In `KafkaClusterIdentifier` as this already contains some configuration
(e.g. Bootstrap server) in which case we should rename the class to
something like `KafkaCluster` as it is no longer just an identifier
3. Reorganize the metadata in a Map in
`KafkaStream` where the String is the proposed
`KafkaClusterIdentifier.name` field.

I am preferring option 3 since this simplifies equals() checks on
KafkaClusterIdentifier (e.g. is it the name, bootstrap, or both?).

Small correction for the MultiClusterKafkaSourceEnumerator section: "This
> reader is responsible for discovering and assigning splits from 1+ cluster"

Thanks for the catch!

the defining characteristic is the dynamic discovery vs. the fact that
> multiple clusters [...]



I think the "Table" in the name of those SQL connectors should avoid
> confusion. Perhaps we can also solicit other ideas? I would throw
> "DiscoveringKafkaSource" into the mix.

 Agreed with Gordon's and your suggestions. Right, the only public facing
name for SQL is `kafka` for the SQL connector identifier. Based on your
suggestions:

1. MultiClusterKafkaSource
2. DynamicKafkaSource
3. DiscoveringKafkaSource
4. MutableKafkaSource
5. AdaptiveKafkaSource

I added a few of my own. I do prefer 2. What do others think?

Best,
Mason

On Sun, Jun 11, 2023 at 1:12 PM Thomas Weise  wrote:

> Hi Mason,
>
> Thanks for the iterations on the FLIP, I think this is in a very good shape
> now.
>
> Small correction for the MultiClusterKafkaSourceEnumerator section: "This
> reader is responsible for discovering and assigning splits from 1+ cluster"
>
> Regarding the user facing name of the connector: I agree with Gordon that
> the defining characteristic is the dynamic discovery vs. the fact that
> multiple clusters may be consumed in parallel. (Although, as described in
> the FLIP, lossless consumer migration only works with a strategy that
> involves intermittent parallel consumption of old and new clusters to drain
> and switch.)
>
> I think the "Table" in the name of those SQL connectors should avoid
> confusion. Perhaps we can also solicit other ideas? I would throw
> "DiscoveringKafkaSource" into the mix.
>
> Cheers,
> Thomas
>
>
>
>
> On Fri, Jun 9, 2023 at 3:40 PM Tzu-Li (Gordon) Tai 
> wrote:
>
> > > Regarding (2), definitely. This is something we planned to add later on
> > but
> > so far keeping things common has been working well.
> >
> > My main worry for doing this as a later iteration is that this would
> > probably be a breaking change for the public interface. If that can be
> > avoided and planned ahead, I'm fine with moving forward with how it is
> > right now.
> >
> > > DynamicKafkaSource may be confusing because it is really similar to the
> > KafkaDynamicSource/Sink (table connectors).
> >
> > The table / sql Kafka connectors (KafkaDynamicTableFactory,
> > KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal classes
> > not really meant to be exposed to the user though.
> > It can cause some confusion internally for the code maintainers, but on
> the
> > actual public surface I don't see this being an issue.
> >
> > Thanks,
> > Gordon
> >
> > On Wed, Jun 7, 2023 at 8:55 PM Mason Chen 
> wrote:
> >
> > > Hi Gordon,
> > >
> > > Thanks for taking a look!
> > >
> > > Regarding (1), there is a need from the readers to send this event at
> > > startup because the reader state may reflect outdated metadata. Thus,
> the
> > > reader should not start without fresh metadata. With fresh metadata,
> the
> > > reader can filter splits from state--this filtering capability is
> > > ultimately how we solve the common issue of "I re-configured my Kafka
> > > source and removed some topic, but it refers to the old topic due to
> > state
> > > *[1]*". I did not mention this because I thought this is more of a
> detail
> > > but I'll make a brief note of it.
> > >
> > > Regarding (2), definitely. This is something we planned to add later on
> > but
> > > so far keeping things common has been working well. In that regard, yes
> > the
> > > metadata service should expose these configurations but the source
> should
> > > not check it into state unlike the other metadata. I'm going to add it
> > to a
> > > section called "future enhancements". This is also feedback that Ryan,
> an
> > > interested user, gave earlier 

[jira] [Created] (FLINK-32318) [flink-operator] missing s3 plugin in folder plugins

2023-06-12 Thread Jira
Luís Costa created FLINK-32318:
--

 Summary: [flink-operator] missing s3 plugin in folder plugins
 Key: FLINK-32318
 URL: https://issues.apache.org/jira/browse/FLINK-32318
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Luís Costa


Greetings,

I'm trying to configure [Flink's Kubernetes HA 
services|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/ha/kubernetes_ha/]
 for flink operator jobs, but got an error regarding s3 plugin: _"Could not 
find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto"_


{code:java}
2023-06-12 10:05:16,981 INFO  akka.remote.Remoting  
   [] - Starting remoting
2023-06-12 10:05:17,194 INFO  akka.remote.Remoting  
   [] - Remoting started; listening on addresses 
:[akka.tcp://flink@10.4.125.209:6123]
2023-06-12 10:05:17,377 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils[] - Actor system 
started at akka.tcp://flink@10.4.125.209:6123
2023-06-12 10:05:18,175 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status FAILED. 
Diagnostics org.apache.flink.util.FlinkException: Could not create the ha 
services from the instantiated HighAvailabilityServicesFactory 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:299)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:285)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:145)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:439)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:382)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at 
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
Caused by: java.io.IOException: Could not create FileSystem for highly 
available storage path 
(s3://td-infra-stg-us-east-1-s3-flinkoperator/flink-data/ha/flink-basic-example-xpto)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
at 
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
at 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:41)
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:296)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3'. The scheme is directly 
supported by Flink through the following plugin(s): flink-s3-fs-hadoop, 
flink-s3-fs-presto. Please ensure that each plugin resides within its own 
subfolder within the plugins directory. See 
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
 for more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at 
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99)
... 13 more {code}

Looking into the container, can see that s3 plugins are in folder /opt/flink/ 
instead of s3/plugins as mentioned 

Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-12 Thread Stefan Richter
Hi,

Thanks a lot for bringing up this topic and for the initial proposal. As more 
and more people are looking into running Flink as a continuous service this 
discussion is becoming very relevant.

What I would like to see is a clearer definition for what we understand by 
stability and compatibility. Our current policy only talks about being able to 
“compile” and “run” with a different version. As far as I can see, there is no 
guarantee about the stability of observable behavior. I believe it’s important 
for the community to include this important aspect in the guarantees that we 
give as our policy.

For all changes that we do to the stable parts of the API, we should also 
consider how easy or difficult different types of changes would be for running 
Flink as a service with continuous delivery. For example, introducing a new 
interface to evolve the methods would make it easier to write adapter code than 
changing method signatures in-place on the existing interface. Those concerns 
should be considered in our process for evolving interfaces.

Best,
Stefan



     
Stefan Richter
Principal Engineer II

Follow us:  

 



> On 11. Jun 2023, at 14:30, Becket Qin  wrote:
> 
> Hi folks,
> 
> As one of the release 2.0 efforts, the release managers were discussing our
> API lifecycle policies. There have been FLIP-196[1] and FLIP-197[2] that
> are relevant to this topic. These two FLIPs defined the stability guarantee
> of the programming APIs with various different stability annotations, and
> the promotion process. A recap of the conclusion is following:
> 
> Stability:
> @Internal API: can change between major/minor/patch releases.
> @Experimental API: can change between major/minor/patch releases.
> @PublicEvolving API: can change between major/minor releases.
> @Public API: can only change between major releases.
> 
> Promotion:
> An @Experimental API should be promoted to @PublicEvolving after two
> releases, and a @PublicEvolving API should be promoted to @Public API after
> two releases, unless there is a documented reason not to do so.
> 
> One thing not mentioned in these two FLIPs is the API deprecation process,
> which is in fact critical and fundamental to how the stability guarantee is
> provided in practice, because the stability is all about removing existing
> APIs. For example, if we want to change a method "ResultFoo foo(ArgumentFoo
> arg)" to "ResultBar bar(ArgumentBar arg)", there will be two ways to do
> this:
> 
> 1. Mark method "foo" as deprecated and add the new method "bar". At some
> point later, remove the method "foo".
> 2. Simply change the API in place, that basically means removing method foo
> and adding method bar at the same time.
> 
> In the first option, users are given a period with stability guarantee to
> migrate from "foo" to "bar". For the second option, this migration period
> is effectively 0. A zero migration period is problematic because end users
> may need a feature/bug fix from a new version, but cannot upgrade right
> away due to some backwards compatible changes, even though these changes
> perfectly comply with the API stability guarantees defined above. So the
> migration period is critical to the API stability guarantees for the end
> users.
> 
> The migration period is essentially how long a deprecated API can be
> removed from the source code. So with this FLIP, I'd like to kick off the
> discussion about our deprecation process.
> 
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-321%253A%2BIntroduce%2Ban%2BAPI%2Bdeprecation%2Bprocess=gmail-imap=168709152300=AOvVaw0h_j72PiGBNM3BFmuHUOis
> 
> Comments are welcome!
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> [1]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-196%253A%2BSource%2BAPI%2Bstability%2Bguarantees=gmail-imap=168709152300=AOvVaw0VDOsdIOFCOsGLlpPJq-ZD
> [2]
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%253A%2BAPI%2Bstability%2Bgraduation%2Bprocess=gmail-imap=168709152300=AOvVaw11GfC5R4cD44a8amORO8EY



[jira] [Created] (FLINK-32317) Enrich metadata in CR error field

2023-06-12 Thread Daren Wong (Jira)
Daren Wong created FLINK-32317:
--

 Summary: Enrich metadata in CR error field
 Key: FLINK-32317
 URL: https://issues.apache.org/jira/browse/FLINK-32317
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.5.0
Reporter: Daren Wong
 Fix For: kubernetes-operator-1.6.0


CR Error field is improved in https://issues.apache.org/jira/browse/FLINK-29708.

The error field is more structured with exception type, stackTrace, 
additionalMetadata, etc.

 

This ticket is a proposal to expose a config 
("kubernetes.operator.exception.metadata.mapper") to enrich the 
additionalMetadata further.

 

The config consists of key-value pairs, for example:
{code:java}
kubernetes.operator.exception.metadata.mapper: IOException:Found 
IOException,403:Found 403 error code{code}
The key is a REGEX string that will be used to match against the whole stack 
trace and if found, the value will be added to additionalMetadata. For example:
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob

  name: basic-session-job-example
  namespace: default
  resourceVersion: "70206149"
  uid: 916ea8f5-0821-4839-9953-2db9678c3fc9
spec:
  deploymentName: basic-session-deployment-example
  job:
args: []
jarURI: https://test-s3.s3.amazonaws.com/doubleExecute.jar
parallelism: 4
state: running
upgradeMode: stateless
status:
  error: 
'{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.IOException:
Server returned HTTP response code: 403 for URL: 
https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found
403 error code","Found 
IOException"]},"throwableList":[{"type":"java.io.IOException","message":"Server
returned HTTP response code: 403 for URL: 
https://test-s3.s3.amazonaws.com/doubleExecute.jar","additionalMetadata":{"exceptionMapper":["Found
403 error code"]}}]}'
...
{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Aitozi
BTW, If there are no other more blocking issue / comments, I would like to
start a VOTE in another thread this wednesday 6.14

Thanks,
Aitozi.

Aitozi  于2023年6月12日周一 23:34写道:

> Hi, Jing,
> Thanks for your explanation. I get your point now.
>
> For the performance part, I think it's a good idea to run with returning a
> big table case, the memory consumption
> should be a point to be taken care about. Because in the ordered mode, the
> head element in buffer may affect the
> total memory consumption.
>
>
> Thanks,
> Aitozi.
>
>
>
> Jing Ge  于2023年6月12日周一 20:28写道:
>
>> Hi Aitozi,
>>
>> Which key will be used for lookup is not an issue, only one row will be
>> required for each key in order to enrich it. True, it depends on the
>> implementation whether multiple rows or single row for each key will be
>> returned. However, for the lookup & enrichment scenario, one row/key is
>> recommended, otherwise, like I mentioned previously, enrichment won't
>> work.
>>
>> I am a little bit concerned about returning a big table for each key,
>> since
>> it will take the async call longer to return and need more memory. The
>> performance tests should cover this scenario. This is not a blocking issue
>> for this FLIP.
>>
>> Best regards,
>> Jing
>>
>> On Sat, Jun 10, 2023 at 4:11 AM Aitozi  wrote:
>>
>> > Hi Jing,
>> > I means the join key is not necessary to be the primary key or
>> unique
>> > index of the database.
>> > In this situation, we may queried out multi rows for one join key. I
>> think
>> > that's why the
>> > LookupFunction#lookup will return a collection of RowData.
>> >
>> > BTW, I think the behavior of lookup join will not affect the semantic of
>> > the async udtf.
>> > We use the Async TableFunction here and the table function can collect
>> > multiple rows.
>> >
>> > Thanks,
>> > Atiozi.
>> >
>> >
>> >
>> > Jing Ge  于2023年6月10日周六 00:15写道:
>> >
>> > > Hi Aitozi,
>> > >
>> > > The keyRow used in this case contains all keys[1].
>> > >
>> > > Best regards,
>> > > Jing
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
>> > >
>> > >
>> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:
>> > >
>> > > > Hi Jing,
>> > > >
>> > > >  The performance test is added to the FLIP.
>> > > >
>> > > >  As I know, The lookup join can return multi rows, it depends on
>> > > > whether  the join key
>> > > > is the primary key of the external database or not. The `lookup` [1]
>> > will
>> > > > return a collection of
>> > > > joined result, and each of them will be collected
>> > > >
>> > > >
>> > > > [1]:
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
>> > > >
>> > > >
>> > > > Thanks,
>> > > > Aitozi.
>> > > >
>> > > > Jing Ge  于2023年6月9日周五 17:05写道:
>> > > >
>> > > > > Hi Aitozi,
>> > > > >
>> > > > > Thanks for the feedback. Looking forward to the performance tests.
>> > > > >
>> > > > > Afaik, lookup returns one row for each key [1] [2]. Conceptually,
>> the
>> > > > > lookup function is used to enrich column(s) from the dimension
>> table.
>> > > If,
>> > > > > for the given key, there will be more than one row, there will be
>> no
>> > > way
>> > > > to
>> > > > > know which row will be used to enrich the key.
>> > > > >
>> > > > > [1]
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
>> > > > > [2]
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
>> > > > >
>> > > > > Best regards,
>> > > > > Jing
>> > > > >
>> > > > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi 
>> wrote:
>> > > > >
>> > > > > > Hi Jing
>> > > > > > Thanks for your good questions. I have updated the example
>> to
>> > the
>> > > > > FLIP.
>> > > > > >
>> > > > > > > Only one row for each lookup
>> > > > > > lookup can also return multi rows, based on the query result.
>> [1]
>> > > > > >
>> > > > > > [1]:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
>> > > > > >
>> > > > > > > If we use async calls with lateral join, my gut feeling is
>> > > > > > that we might have many more async calls than lookup join. I am
>> not
>> > > > > really
>> > > > > > sure if we will be facing potential issues in this case or not.
>> > > > > >
>> > > > > > IMO, the work pattern is 

Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Aitozi
Hi, Jing,
Thanks for your explanation. I get your point now.

For the performance part, I think it's a good idea to run with returning a
big table case, the memory consumption
should be a point to be taken care about. Because in the ordered mode, the
head element in buffer may affect the
total memory consumption.


Thanks,
Aitozi.



Jing Ge  于2023年6月12日周一 20:28写道:

> Hi Aitozi,
>
> Which key will be used for lookup is not an issue, only one row will be
> required for each key in order to enrich it. True, it depends on the
> implementation whether multiple rows or single row for each key will be
> returned. However, for the lookup & enrichment scenario, one row/key is
> recommended, otherwise, like I mentioned previously, enrichment won't work.
>
> I am a little bit concerned about returning a big table for each key, since
> it will take the async call longer to return and need more memory. The
> performance tests should cover this scenario. This is not a blocking issue
> for this FLIP.
>
> Best regards,
> Jing
>
> On Sat, Jun 10, 2023 at 4:11 AM Aitozi  wrote:
>
> > Hi Jing,
> > I means the join key is not necessary to be the primary key or unique
> > index of the database.
> > In this situation, we may queried out multi rows for one join key. I
> think
> > that's why the
> > LookupFunction#lookup will return a collection of RowData.
> >
> > BTW, I think the behavior of lookup join will not affect the semantic of
> > the async udtf.
> > We use the Async TableFunction here and the table function can collect
> > multiple rows.
> >
> > Thanks,
> > Atiozi.
> >
> >
> >
> > Jing Ge  于2023年6月10日周六 00:15写道:
> >
> > > Hi Aitozi,
> > >
> > > The keyRow used in this case contains all keys[1].
> > >
> > > Best regards,
> > > Jing
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > >
> > >
> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:
> > >
> > > > Hi Jing,
> > > >
> > > >  The performance test is added to the FLIP.
> > > >
> > > >  As I know, The lookup join can return multi rows, it depends on
> > > > whether  the join key
> > > > is the primary key of the external database or not. The `lookup` [1]
> > will
> > > > return a collection of
> > > > joined result, and each of them will be collected
> > > >
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> > > >
> > > >
> > > > Thanks,
> > > > Aitozi.
> > > >
> > > > Jing Ge  于2023年6月9日周五 17:05写道:
> > > >
> > > > > Hi Aitozi,
> > > > >
> > > > > Thanks for the feedback. Looking forward to the performance tests.
> > > > >
> > > > > Afaik, lookup returns one row for each key [1] [2]. Conceptually,
> the
> > > > > lookup function is used to enrich column(s) from the dimension
> table.
> > > If,
> > > > > for the given key, there will be more than one row, there will be
> no
> > > way
> > > > to
> > > > > know which row will be used to enrich the key.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi 
> wrote:
> > > > >
> > > > > > Hi Jing
> > > > > > Thanks for your good questions. I have updated the example to
> > the
> > > > > FLIP.
> > > > > >
> > > > > > > Only one row for each lookup
> > > > > > lookup can also return multi rows, based on the query result. [1]
> > > > > >
> > > > > > [1]:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> > > > > >
> > > > > > > If we use async calls with lateral join, my gut feeling is
> > > > > > that we might have many more async calls than lookup join. I am
> not
> > > > > really
> > > > > > sure if we will be facing potential issues in this case or not.
> > > > > >
> > > > > > IMO, the work pattern is similar to the lookup function, for each
> > row
> > > > > from
> > > > > > the left table,
> > > > > > it will evaluate the eval method once, so the async call numbers
> > will
> > > > not
> > > > > > change.
> > > > > > and the maximum calls in flight is limited by the Async operators
> > > > buffer
> > > > > > capacity
> > > > > > which will be controlled by 

[DISCUSS] FLIP-322 Cooldown period for adaptive scheduler

2023-06-12 Thread Etienne Chauchot

Hi,

I’d like to start a discussion about FLIP-322 [1] which introduces a 
cooldown period for the adaptive scheduler.


I'd like to get your feedback especially @Robert as you opened the 
related ticket and worked on the reactive mode a lot.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-322+Cooldown+period+for+adaptive+scheduler


Best

Etienne



Re: [VOTE] FLIP-311: Support Call Stored Procedure

2023-06-12 Thread Rui Fan
+1 (binding)

Best,
Rui Fan

On Mon, Jun 12, 2023 at 22:20 Benchao Li  wrote:

> +1 (binding)
>
> yuxia  于2023年6月12日周一 17:58写道:
>
> > Hi everyone,
> > Thanks for all the feedback about FLIP-311: Support Call Stored
> > Procedure[1]. Based on the discussion [2], we have come to a consensus,
> so
> > I would like to start a vote.
> > The vote will be open for at least 72 hours (until June 15th, 10:00AM
> GMT)
> > unless there is an objection or an insufficient number of votes.
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> > [2] https://lists.apache.org/thread/k6s50gcgznon9v1oylyh396gb5kgrwmd
> >
> > Best regards,
> > Yuxia
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-12 Thread Rui Fan
+1 (binding)

Best,
Rui Fan

On Mon, Jun 12, 2023 at 19:58 liu ron  wrote:

> +1 (no-binding)
>
> Best,
> Ron
>
> Jing Ge  于2023年6月12日周一 19:33写道:
>
> > +1(binding) Thanks!
> >
> > Best regards,
> > Jing
> >
> > On Mon, Jun 12, 2023 at 12:01 PM yuxia 
> > wrote:
> >
> > > +1 (binding)
> > > Thanks Mang driving it.
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "zhangmang1" 
> > > 收件人: "dev" 
> > > 发送时间: 星期一, 2023年 6 月 12日 下午 5:31:10
> > > 主题: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS)
> > > statement
> > >
> > > Hi everyone,
> > >
> > > Thanks for all the feedback about FLIP-305: Support atomic for CREATE
> > > TABLE AS SELECT(CTAS) statement[1].
> > > [2] is the discussion thread.
> > >
> > > I'd like to start a vote for it. The vote will be open for at least 72
> > > hours (until June 15th, 10:00AM GMT) unless there is an objection or an
> > > insufficient number of votes.[1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> > > [2]https://lists.apache.org/thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Best regards,
> > > Mang Zhang
> > >
> >
>


Re: [VOTE] FLIP-311: Support Call Stored Procedure

2023-06-12 Thread Benchao Li
+1 (binding)

yuxia  于2023年6月12日周一 17:58写道:

> Hi everyone,
> Thanks for all the feedback about FLIP-311: Support Call Stored
> Procedure[1]. Based on the discussion [2], we have come to a consensus, so
> I would like to start a vote.
> The vote will be open for at least 72 hours (until June 15th, 10:00AM GMT)
> unless there is an objection or an insufficient number of votes.
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
> [2] https://lists.apache.org/thread/k6s50gcgznon9v1oylyh396gb5kgrwmd
>
> Best regards,
> Yuxia
>


-- 

Best,
Benchao Li


[jira] [Created] (FLINK-32316) Duplicated announceCombinedWatermark task maybe scheduled if jobmanager failover

2023-06-12 Thread Cai Liuyang (Jira)
Cai Liuyang created FLINK-32316:
---

 Summary: Duplicated announceCombinedWatermark task maybe scheduled 
if jobmanager failover
 Key: FLINK-32316
 URL: https://issues.apache.org/jira/browse/FLINK-32316
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.0
Reporter: Cai Liuyang


When we try SourceAlignment feature, we found there will be a duplicated 
announceCombinedWatermark task will be scheduled after JobManager failover, and 
auto recover job from checkpoint.

The reason i think is  we should schedule announceCombinedWatermark task during 
SourceCoordinator::start function not in SourceCoordinator construct function 
(see  
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L149]
 ), because when jobManager encounter failover and auto recover job, it will 
create SourceCoordinator twice:
 * The first one is  when JobMaster is create it will create the 
DefaultExecutionGraph.
 * The Second one is JobMaster call restoreLatestCheckpointedStateInternal 
method, which will be reset old sourceCoordinator and initialize a new one, but 
because the first sourceCoordinator is not started(SourceCoordinator will be 
started before SchedulerBase::startScheduling, so the first SourceCoordinator 
will not be fully closed).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-12 Thread Paul Lam
Hi Yang,

Thanks a lot for your input!

It’s great that FLINK-28915 has covered the file download part. I’ve created 
a ticket for the file upload part [1]. It's a prerequisite for supporting K8s 
application 
mode for SQL Gateway.

[1] https://issues.apache.org/jira/browse/FLINK-32315

Best,
Paul Lam

> 2023年6月12日 11:40,Yang Wang  写道:
> 
> Sorry for the late reply. I am in favor of introducing such a built-in
> resource localization mechanism
> based on Flink FileSystem. Then FLINK-28915[1] could be the second step
> which will download
> the jars and dependencies to the JobManager/TaskManager local directory
> before working.
> 
> The first step could be done in another ticket in Flink. Or some external
> Flink jobs management system
> could also take care of this.
> 
> [1]. https://issues.apache.org/jira/browse/FLINK-28915
> 
> Best,
> Yang
> 
> Paul Lam  于2023年6月9日周五 17:39写道:
> 
>> Hi Mason,
>> 
>> I get your point. I'm increasingly feeling the need to introduce a
>> built-in
>> file distribution mechanism for flink-kubernetes module, just like Spark
>> does with `spark.kubernetes.file.upload.path` [1].
>> 
>> I’m assuming the workflow is as follows:
>> 
>> - KubernetesClusterDescripter uploads all local resources to a remote
>>  storage via Flink filesystem (skips if the resources are already remote).
>> - KubernetesApplicationClusterEntrypoint downloads the resources
>>  and put them in the classpath during startup.
>> 
>> I wouldn't mind splitting it into another FLIP to ensure that everything is
>> done correctly.
>> 
>> cc'ed @Yang to gather more opinions.
>> 
>> [1]
>> https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management
>> 
>> Best,
>> Paul Lam
>> 
>> 2023年6月8日 12:15,Mason Chen  写道:
>> 
>> Hi Paul,
>> 
>> Thanks for your response!
>> 
>> I agree that utilizing SQL Drivers in Java applications is equally
>> important
>> 
>> as employing them in SQL Gateway. WRT init containers, I think most
>> users use them just as a workaround. For example, wget a jar from the
>> maven repo.
>> 
>> We could implement the functionality in SQL Driver in a more graceful
>> way and the flink-supported filesystem approach seems to be a
>> good choice.
>> 
>> 
>> My main point is: can we solve the problem with a design agnostic of SQL
>> and Stream API? I mentioned a use case where this ability is useful for
>> Java or Stream API applications. Maybe this is even a non-goal to your FLIP
>> since you are focusing on the driver entrypoint.
>> 
>> Jark mentioned some optimizations:
>> 
>> This allows SQLGateway to leverage some metadata caching and UDF JAR
>> caching for better compiling performance.
>> 
>> It would be great to see this even outside the SQLGateway (i.e. UDF JAR
>> caching).
>> 
>> Best,
>> Mason
>> 
>> On Wed, Jun 7, 2023 at 2:26 AM Shengkai Fang  wrote:
>> 
>> Hi. Paul.  Thanks for your update and the update makes me understand the
>> design much better.
>> 
>> But I still have some questions about the FLIP.
>> 
>> For SQL Gateway, only DMLs need to be delegated to the SQL server
>> Driver. I would think about the details and update the FLIP. Do you have
>> 
>> some
>> 
>> ideas already?
>> 
>> 
>> If the applicaiton mode can not support library mode, I think we should
>> only execute INSERT INTO and UPDATE/ DELETE statement in the application
>> mode. AFAIK, we can not support ANALYZE TABLE and CALL PROCEDURE
>> statements. The ANALYZE TABLE syntax need to register the statistic to the
>> catalog after job finishes and the CALL PROCEDURE statement doesn't
>> generate the ExecNodeGraph.
>> 
>> * Introduce storage via option `sql-gateway.application.storage-dir`
>> 
>> If we can not support to submit the jars through web submission, +1 to
>> introduce the options to upload the files. While I think the uploader
>> should be responsible to remove the uploaded jars. Can we remove the jars
>> if the job is running or gateway exits?
>> 
>> * JobID is not avaliable
>> 
>> Can we use the returned rest client by ApplicationDeployer to query the job
>> id? I am concerned that users don't know which job is related to the
>> submitted SQL.
>> 
>> * Do we need to introduce a new module named flink-table-sql-runner?
>> 
>> It seems we need to introduce a new module. Will the new module is
>> available in the distribution package? I agree with Jark that we don't need
>> to introduce this for table-API users and these users have their main
>> class. If we want to make users write the k8s operator more easily, I think
>> we should modify the k8s operator repo. If we don't need to support SQL
>> files, can we make this jar only visible in the sql-gateway like we do in
>> the planner loader?[1]
>> 
>> [1]
>> 
>> 
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java#L95
>> 
>> Best,
>> Shengkai
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> Weihua Hu  于2023年6月7日周三 10:52写道:
>> 
>> Hi,
>> 
>> Thanks 

[jira] [Created] (FLINK-32315) Support local file upload in K8s mode

2023-06-12 Thread Paul Lin (Jira)
Paul Lin created FLINK-32315:


 Summary: Support local file upload in K8s mode
 Key: FLINK-32315
 URL: https://issues.apache.org/jira/browse/FLINK-32315
 Project: Flink
  Issue Type: New Feature
  Components: Client / Job Submission, Deployment / Kubernetes
Reporter: Paul Lin


Currently, Flink assumes all resources are locally accessible in the pods, 
which requires users to prepare the resources by mounting storages, downloading 
resources with init containers, or rebuilding images for each execution.

We could make things much easier by introducing a built-in file distribution 
mechanism based on Flink-supported filesystems. It's implemented in two steps:

 
1. KubernetesClusterDescripter uploads all local resources to remote storage 
via Flink filesystem (skips if the resources are already remote).
2. KubernetesApplicationClusterEntrypoint and KubernetesTaskExecutorRunner 
download the resources and put them in the classpath during startup.
 
The 2nd step is mostly done by 
[FLINK-28915|https://issues.apache.org/jira/browse/FLINK-28915], thus this 
issue is focused on the upload part.
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-12 Thread Jing Ge
Hi Aitozi,

Which key will be used for lookup is not an issue, only one row will be
required for each key in order to enrich it. True, it depends on the
implementation whether multiple rows or single row for each key will be
returned. However, for the lookup & enrichment scenario, one row/key is
recommended, otherwise, like I mentioned previously, enrichment won't work.

I am a little bit concerned about returning a big table for each key, since
it will take the async call longer to return and need more memory. The
performance tests should cover this scenario. This is not a blocking issue
for this FLIP.

Best regards,
Jing

On Sat, Jun 10, 2023 at 4:11 AM Aitozi  wrote:

> Hi Jing,
> I means the join key is not necessary to be the primary key or unique
> index of the database.
> In this situation, we may queried out multi rows for one join key. I think
> that's why the
> LookupFunction#lookup will return a collection of RowData.
>
> BTW, I think the behavior of lookup join will not affect the semantic of
> the async udtf.
> We use the Async TableFunction here and the table function can collect
> multiple rows.
>
> Thanks,
> Atiozi.
>
>
>
> Jing Ge  于2023年6月10日周六 00:15写道:
>
> > Hi Aitozi,
> >
> > The keyRow used in this case contains all keys[1].
> >
> > Best regards,
> > Jing
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> >
> >
> > On Fri, Jun 9, 2023 at 3:42 PM Aitozi  wrote:
> >
> > > Hi Jing,
> > >
> > >  The performance test is added to the FLIP.
> > >
> > >  As I know, The lookup join can return multi rows, it depends on
> > > whether  the join key
> > > is the primary key of the external database or not. The `lookup` [1]
> will
> > > return a collection of
> > > joined result, and each of them will be collected
> > >
> > >
> > > [1]:
> > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52
> > >
> > >
> > > Thanks,
> > > Aitozi.
> > >
> > > Jing Ge  于2023年6月9日周五 17:05写道:
> > >
> > > > Hi Aitozi,
> > > >
> > > > Thanks for the feedback. Looking forward to the performance tests.
> > > >
> > > > Afaik, lookup returns one row for each key [1] [2]. Conceptually, the
> > > > lookup function is used to enrich column(s) from the dimension table.
> > If,
> > > > for the given key, there will be more than one row, there will be no
> > way
> > > to
> > > > know which row will be used to enrich the key.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49
> > > > [2]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi  wrote:
> > > >
> > > > > Hi Jing
> > > > > Thanks for your good questions. I have updated the example to
> the
> > > > FLIP.
> > > > >
> > > > > > Only one row for each lookup
> > > > > lookup can also return multi rows, based on the query result. [1]
> > > > >
> > > > > [1]:
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56
> > > > >
> > > > > > If we use async calls with lateral join, my gut feeling is
> > > > > that we might have many more async calls than lookup join. I am not
> > > > really
> > > > > sure if we will be facing potential issues in this case or not.
> > > > >
> > > > > IMO, the work pattern is similar to the lookup function, for each
> row
> > > > from
> > > > > the left table,
> > > > > it will evaluate the eval method once, so the async call numbers
> will
> > > not
> > > > > change.
> > > > > and the maximum calls in flight is limited by the Async operators
> > > buffer
> > > > > capacity
> > > > > which will be controlled by the option.
> > > > >
> > > > > BTW, for the naming of these option, I updated the FLIP about this
> > you
> > > > can
> > > > > refer to
> > > > > the section of "ConfigOption" and "Rejected Alternatives"
> > > > >
> > > > > In the end, for the performance evaluation, I'd like to do some
> tests
> > > and
> > > > > will update it to the FLIP doc
> > > > >
> > > > > Thanks,
> > > > > Aitozi.
> > > > >
> > > > >
> > > > > Jing Ge  于2023年6月9日周五 07:23写道:
> > > > >
> > > > > > Hi Aitozi,
> > > > > >
> > > > > > Thanks for the clarification. The code example looks
> interesting. I
> > > > would
> > > > > > suggest adding them into the FLIP. The description with 

[jira] [Created] (FLINK-32314) Ignore class-loading errors after RPC system shutdown

2023-06-12 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32314:


 Summary: Ignore class-loading errors after RPC system shutdown
 Key: FLINK-32314
 URL: https://issues.apache.org/jira/browse/FLINK-32314
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / RPC, Tests
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


In tests we occasionally see the akka rpc service throwing class loading errors 
_after_ it was shut down.
AFAICT our shutdown procedure is correct, and it's just akka shutting down some 
things asynchronously.
I couldn't figure out why/what is still running, so as a bandaid I suggest to 
ignore classloading errors after the rpc service shutdown has completed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-12 Thread liu ron
+1 (no-binding)

Best,
Ron

Jing Ge  于2023年6月12日周一 19:33写道:

> +1(binding) Thanks!
>
> Best regards,
> Jing
>
> On Mon, Jun 12, 2023 at 12:01 PM yuxia 
> wrote:
>
> > +1 (binding)
> > Thanks Mang driving it.
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "zhangmang1" 
> > 收件人: "dev" 
> > 发送时间: 星期一, 2023年 6 月 12日 下午 5:31:10
> > 主题: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS)
> > statement
> >
> > Hi everyone,
> >
> > Thanks for all the feedback about FLIP-305: Support atomic for CREATE
> > TABLE AS SELECT(CTAS) statement[1].
> > [2] is the discussion thread.
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours (until June 15th, 10:00AM GMT) unless there is an objection or an
> > insufficient number of votes.[1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> > [2]https://lists.apache.org/thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best regards,
> > Mang Zhang
> >
>


[jira] [Created] (FLINK-32313) CrateDB relies on flink-shaded in flink-connector-jdbc

2023-06-12 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-32313:
--

 Summary: CrateDB relies on flink-shaded in flink-connector-jdbc
 Key: FLINK-32313
 URL: https://issues.apache.org/jira/browse/FLINK-32313
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Reporter: Martijn Visser


See 
https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java#L27
 - JDBC shouldn't rely on flink-shaded. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32312) SSLConnectionSocketFactory produced no output for 900 seconds

2023-06-12 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32312:
---

 Summary: SSLConnectionSocketFactory produced no output for 900 
seconds
 Key: FLINK-32312
 URL: https://issues.apache.org/jira/browse/FLINK-32312
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.18.0
Reporter: Sergey Nuyanzin


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49688=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=15175
{noformat}
"main" #1 prio=5 os_prio=0 tid=0x7fb46c00b800 nid=0x184 runnable 
[0x7fb473251000]
Jun 06 09:53:49java.lang.Thread.State: RUNNABLE
Jun 06 09:53:49 at java.net.PlainSocketImpl.socketConnect(Native Method)
Jun 06 09:53:49 at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
Jun 06 09:53:49 at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
Jun 06 09:53:49 at 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
Jun 06 09:53:49 at 
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
Jun 06 09:53:49 at java.net.Socket.connect(Socket.java:607)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:368)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.MainClientExec.execute(MainClientExec.java:236)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RetryExec.execute(RetryExec.java:89)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.httpclient.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.execute(AbstractHttpClientWagon.java:1005)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1162)
Jun 06 09:53:49 at 
org.apache.maven.wagon.providers.http.wagon.shared.AbstractHttpClientWagon.fillInputData(AbstractHttpClientWagon.java:1140)
Jun 06 09:53:49 at 
org.apache.maven.wagon.StreamWagon.getInputStream(StreamWagon.java:126)
Jun 06 09:53:49 at 
org.apache.maven.wagon.StreamWagon.getIfNewer(StreamWagon.java:88)

...
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-12 Thread Jing Ge
+1(binding) Thanks!

Best regards,
Jing

On Mon, Jun 12, 2023 at 12:01 PM yuxia  wrote:

> +1 (binding)
> Thanks Mang driving it.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "zhangmang1" 
> 收件人: "dev" 
> 发送时间: 星期一, 2023年 6 月 12日 下午 5:31:10
> 主题: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS)
> statement
>
> Hi everyone,
>
> Thanks for all the feedback about FLIP-305: Support atomic for CREATE
> TABLE AS SELECT(CTAS) statement[1].
> [2] is the discussion thread.
>
> I'd like to start a vote for it. The vote will be open for at least 72
> hours (until June 15th, 10:00AM GMT) unless there is an objection or an
> insufficient number of votes.[1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
> [2]https://lists.apache.org/thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd
>
>
>
>
>
>
>
> --
>
> Best regards,
> Mang Zhang
>


[jira] [Created] (FLINK-32311) ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and DefaultLeaderElectionService.onGrantLeadership fell into dead lock

2023-06-12 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-32311:
---

 Summary: 
ZooKeeperLeaderElectionTest.testZooKeeperReelectionWithReplacement and 
DefaultLeaderElectionService.onGrantLeadership fell into dead lock
 Key: FLINK-32311
 URL: https://issues.apache.org/jira/browse/FLINK-32311
 Project: Flink
  Issue Type: Bug
Reporter: Sergey Nuyanzin
 Fix For: 1.18.0


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=49750=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8]

 

there are 2 threads one locked {{0xe3a8a1e8}} and waiting for 
{{0xe3a89c18}}

{noformat}

2023-06-08T01:18:54.5609123Z Jun 08 01:18:54 
"ForkJoinPool-50-worker-25-EventThread" #956 daemon prio=5 os_prio=0 
tid=0x7f9374253800 nid=0x6a4e waiting for monitor entry [0x7f94b63e1000]
2023-06-08T01:18:54.5609820Z Jun 08 01:18:54java.lang.Thread.State: BLOCKED 
(on object monitor)
2023-06-08T01:18:54.5610557Z Jun 08 01:18:54at 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.runInLeaderEventThread(DefaultLeaderElectionService.java:425)
2023-06-08T01:18:54.5611459Z Jun 08 01:18:54- waiting to lock 
<0xe3a89c18> (a java.lang.Object)
2023-06-08T01:18:54.5612198Z Jun 08 01:18:54at 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService.onGrantLeadership(DefaultLeaderElectionService.java:300)
2023-06-08T01:18:54.5613110Z Jun 08 01:18:54at 
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver.isLeader(ZooKeeperLeaderElectionDriver.java:153)
2023-06-08T01:18:54.5614070Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$$Lambda$1649/586959400.accept(Unknown
 Source)
2023-06-08T01:18:54.5615014Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.lambda$forEach$0(MappingListenerManager.java:92)
2023-06-08T01:18:54.5616259Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1640/1393625763.run(Unknown
 Source)
2023-06-08T01:18:54.5617137Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager$$Lambda$1633/2012730699.execute(Unknown
 Source)
2023-06-08T01:18:54.5618047Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.MappingListenerManager.forEach(MappingListenerManager.java:89)
2023-06-08T01:18:54.5618994Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.listen.StandardListenerManager.forEach(StandardListenerManager.java:89)
2023-06-08T01:18:54.5620071Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.setLeadership(LeaderLatch.java:711)
2023-06-08T01:18:54.5621198Z Jun 08 01:18:54- locked <0xe3a8a1e8> 
(a 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch)
2023-06-08T01:18:54.5622072Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.checkLeadership(LeaderLatch.java:597)
2023-06-08T01:18:54.5622991Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch.access$600(LeaderLatch.java:64)
2023-06-08T01:18:54.5623988Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch$7.processResult(LeaderLatch.java:648)
2023-06-08T01:18:54.5624965Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
2023-06-08T01:18:54.5626218Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
2023-06-08T01:18:54.5627369Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.WatcherRemovalFacade.processBackgroundOperation(WatcherRemovalFacade.java:152)
2023-06-08T01:18:54.5628353Z Jun 08 01:18:54at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.GetChildrenBuilderImpl$2.processResult(GetChildrenBuilderImpl.java:187)
2023-06-08T01:18:54.5629281Z Jun 08 01:18:54at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:666)
2023-06-08T01:18:54.5630124Z Jun 08 01:18:54at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:553)
{noformat}
and another locked {{0xe3a89c18}} and waits for {{0xe3a8a1e8}}
{noformat}
2023-06-08T01:18:54.5738286Z Jun 08 01:18:54 "ForkJoinPool-50-worker-25" #620 
daemon prio=5 os_prio=0 

[jira] [Created] (FLINK-32310) Support enhanced show functions syntax

2023-06-12 Thread Ran Tao (Jira)
Ran Tao created FLINK-32310:
---

 Summary: Support enhanced show functions syntax
 Key: FLINK-32310
 URL: https://issues.apache.org/jira/browse/FLINK-32310
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API, Table SQL / Planner
Reporter: Ran Tao


As FLIP discussed. We will support new syntax for showing functions.

The syntax:
| |SHOW [USER] FUNCTIONS [ ( FROM \| IN ) [catalog_name.]database_name ] [ 
[NOT] (LIKE \| ILIKE)  ]| |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-12 Thread yuxia
+1 (binding)
Thanks Mang driving it.

Best regards,
Yuxia

- 原始邮件 -
发件人: "zhangmang1" 
收件人: "dev" 
发送时间: 星期一, 2023年 6 月 12日 下午 5:31:10
主题: [VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

Hi everyone,

Thanks for all the feedback about FLIP-305: Support atomic for CREATE TABLE AS 
SELECT(CTAS) statement[1].
[2] is the discussion thread.

I'd like to start a vote for it. The vote will be open for at least 72 hours 
(until June 15th, 10:00AM GMT) unless there is an objection or an insufficient 
number of 
votes.[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
[2]https://lists.apache.org/thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd







--

Best regards,
Mang Zhang


[VOTE] FLIP-311: Support Call Stored Procedure

2023-06-12 Thread yuxia
Hi everyone, 
Thanks for all the feedback about FLIP-311: Support Call Stored Procedure[1]. 
Based on the discussion [2], we have come to a consensus, so I would like to 
start a vote. 
The vote will be open for at least 72 hours (until June 15th, 10:00AM GMT) 
unless there is an objection or an insufficient number of votes. 


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-311%3A+Support+Call+Stored+Procedure
 
[2] https://lists.apache.org/thread/k6s50gcgznon9v1oylyh396gb5kgrwmd 

Best regards, 
Yuxia 


[VOTE] FLIP-305: Support atomic for CREATE TABLE AS SELECT(CTAS) statement

2023-06-12 Thread Mang Zhang
Hi everyone,

Thanks for all the feedback about FLIP-305: Support atomic for CREATE TABLE AS 
SELECT(CTAS) statement[1].
[2] is the discussion thread.

I'd like to start a vote for it. The vote will be open for at least 72 hours 
(until June 15th, 10:00AM GMT) unless there is an objection or an insufficient 
number of 
votes.[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-305%3A+Support+atomic+for+CREATE+TABLE+AS+SELECT%28CTAS%29+statement
[2]https://lists.apache.org/thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd







--

Best regards,
Mang Zhang

[jira] [Created] (FLINK-32309) Shared classpaths and jars manager for jobs in sql gateway cause confliction

2023-06-12 Thread Fang Yong (Jira)
Fang Yong created FLINK-32309:
-

 Summary: Shared classpaths and jars manager for jobs in sql 
gateway cause confliction
 Key: FLINK-32309
 URL: https://issues.apache.org/jira/browse/FLINK-32309
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Gateway
Affects Versions: 1.18.0
Reporter: Fang Yong


Current all jobs in the same session of sql gateway will share the resource 
manager which provide the classpath for jobs. After a job is performed, it's 
classpath and jars will be in the shared resource manager which are used by the 
next jobs. It may cause too many unnecessary jars in a job or even cause 
confliction 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32308) RestClusterClient submit job to remote cluster

2023-06-12 Thread Spongebob (Jira)
Spongebob created FLINK-32308:
-

 Summary: RestClusterClient submit job to remote cluster
 Key: FLINK-32308
 URL: https://issues.apache.org/jira/browse/FLINK-32308
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Reporter: Spongebob


I just used `RestClusterClient` submit job to remote cluster, but out of my 
expectation it submitted to local cluster instead. Could you help with me
{code:java}
String host = "x.x.x.x";
int port = 8081;
Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, 6123);
flinkConfiguration.setInteger(RestOptions.PORT, port);

RestClusterClient clusterClient = new 
RestClusterClient<>(flinkConfiguration, StandaloneClusterId.getInstance());
String s = clusterClient.getWebInterfaceURL();
List extraJars = new ArrayList<>();
extraJars.add(new File("C:\\Users\\extend.jar").toURI().toURL());
PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setConfiguration(flinkConfiguration)
.setJarFile(new File("F:\\data.jar"))
.setEntryPointClassName("MyApplication")
.setUserClassPaths(extraJars)
.build();
JobID jobID = JobID.generate();
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, 
flinkConfiguration, 2, jobID, false);
clusterClient.submitJob(jobGraph);
System.out.println(jobID); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32307) Incorrect docs for default behavior of "scan.startup.mode" option

2023-06-12 Thread Gunnar Morling (Jira)
Gunnar Morling created FLINK-32307:
--

 Summary: Incorrect docs for default behavior of 
"scan.startup.mode" option
 Key: FLINK-32307
 URL: https://issues.apache.org/jira/browse/FLINK-32307
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Kafka
Reporter: Gunnar Morling


The [Kafka connector 
docs|https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#start-reading-position]
 say this in regards to the {{scan.startup.mode}} option:

{quote}
The default option value is group-offsets which indicates to consume from last 
committed offsets in ZK / Kafka brokers.
{quote}

Whereas what I actually observe is that the "earliest-offset" mode is used when 
not specifying a value for this option. This matches the implementation in 
[{{KafkaSourceBuilder}}|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L106]
 from a quick glimpse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-12 Thread Shengkai Fang
> If it’s the case, I’m good with introducing a new module and making SQL
Driver
> an internal class and accepts JSON plans only.

I rethink this again and again. I think it's better to move the SqlDriver
into the sql-gateway module because the sql client relies on the
sql-gateway to submit the sql and the sql-gateway has the ability to
generate the ExecNodeGraph now. +1 to support accepting JSON plans only.

* Upload configuration through command line parameter

ExecNodeGraph only contains the job's information but it doesn't contain
the checkpoint dir, checkpoint interval, execution mode and so on. So I
think we should also upload the configuration.

* KubernetesClusterDescripter and
KubernetesApplicationClusterEntrypoint are responsible for the jar
upload/download

+1 for the change.

Could you update the FLIP about the current discussion?

Best,
Shengkai






Yang Wang  于2023年6月12日周一 11:41写道:

> Sorry for the late reply. I am in favor of introducing such a built-in
> resource localization mechanism
> based on Flink FileSystem. Then FLINK-28915[1] could be the second step
> which will download
> the jars and dependencies to the JobManager/TaskManager local directory
> before working.
>
> The first step could be done in another ticket in Flink. Or some external
> Flink jobs management system
> could also take care of this.
>
> [1]. https://issues.apache.org/jira/browse/FLINK-28915
>
> Best,
> Yang
>
> Paul Lam  于2023年6月9日周五 17:39写道:
>
> > Hi Mason,
> >
> > I get your point. I'm increasingly feeling the need to introduce a
> > built-in
> > file distribution mechanism for flink-kubernetes module, just like Spark
> > does with `spark.kubernetes.file.upload.path` [1].
> >
> > I’m assuming the workflow is as follows:
> >
> > - KubernetesClusterDescripter uploads all local resources to a remote
> >   storage via Flink filesystem (skips if the resources are already
> remote).
> > - KubernetesApplicationClusterEntrypoint downloads the resources
> >   and put them in the classpath during startup.
> >
> > I wouldn't mind splitting it into another FLIP to ensure that everything
> is
> > done correctly.
> >
> > cc'ed @Yang to gather more opinions.
> >
> > [1]
> >
> https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management
> >
> > Best,
> > Paul Lam
> >
> > 2023年6月8日 12:15,Mason Chen  写道:
> >
> > Hi Paul,
> >
> > Thanks for your response!
> >
> > I agree that utilizing SQL Drivers in Java applications is equally
> > important
> >
> > as employing them in SQL Gateway. WRT init containers, I think most
> > users use them just as a workaround. For example, wget a jar from the
> > maven repo.
> >
> > We could implement the functionality in SQL Driver in a more graceful
> > way and the flink-supported filesystem approach seems to be a
> > good choice.
> >
> >
> > My main point is: can we solve the problem with a design agnostic of SQL
> > and Stream API? I mentioned a use case where this ability is useful for
> > Java or Stream API applications. Maybe this is even a non-goal to your
> FLIP
> > since you are focusing on the driver entrypoint.
> >
> > Jark mentioned some optimizations:
> >
> > This allows SQLGateway to leverage some metadata caching and UDF JAR
> > caching for better compiling performance.
> >
> > It would be great to see this even outside the SQLGateway (i.e. UDF JAR
> > caching).
> >
> > Best,
> > Mason
> >
> > On Wed, Jun 7, 2023 at 2:26 AM Shengkai Fang  wrote:
> >
> > Hi. Paul.  Thanks for your update and the update makes me understand the
> > design much better.
> >
> > But I still have some questions about the FLIP.
> >
> > For SQL Gateway, only DMLs need to be delegated to the SQL server
> > Driver. I would think about the details and update the FLIP. Do you have
> >
> > some
> >
> > ideas already?
> >
> >
> > If the applicaiton mode can not support library mode, I think we should
> > only execute INSERT INTO and UPDATE/ DELETE statement in the application
> > mode. AFAIK, we can not support ANALYZE TABLE and CALL PROCEDURE
> > statements. The ANALYZE TABLE syntax need to register the statistic to
> the
> > catalog after job finishes and the CALL PROCEDURE statement doesn't
> > generate the ExecNodeGraph.
> >
> > * Introduce storage via option `sql-gateway.application.storage-dir`
> >
> > If we can not support to submit the jars through web submission, +1 to
> > introduce the options to upload the files. While I think the uploader
> > should be responsible to remove the uploaded jars. Can we remove the jars
> > if the job is running or gateway exits?
> >
> > * JobID is not avaliable
> >
> > Can we use the returned rest client by ApplicationDeployer to query the
> job
> > id? I am concerned that users don't know which job is related to the
> > submitted SQL.
> >
> > * Do we need to introduce a new module named flink-table-sql-runner?
> >
> > It seems we need to introduce a new module. Will the new module is
> > available in the distribution package? I agree with Jark 

Re: [VOTE] FLIP-315: Support Operator Fusion Codegen for Flink SQL

2023-06-12 Thread liu ron
Hi, all.
FLIP-315 [1] has been accepted.
There are 5 binding votes, 1 non-binding votes:
- Jark Wu(binding)
- Jingsong Li (binding)
- Benchao Li (binding)
- Weijie Guo(binding)
- Jing Ge(binding)

- Aitozi (non-binding)
Thanks everyone.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL

Best,
Ron

Aitozi  于2023年6月8日周四 13:30写道:

> +1
>
> Looking forward to this feature.
>
> Best,
> Aitozi.
>
> Jing Ge  于2023年6月8日周四 04:44写道:
>
> > +1
> >
> > Best Regards,
> > Jing
> >
> > On Wed, Jun 7, 2023 at 10:52 AM weijie guo 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Jingsong Li  于2023年6月7日周三 15:59写道:
> > >
> > > > +1
> > > >
> > > > On Wed, Jun 7, 2023 at 3:03 PM Benchao Li 
> > wrote:
> > > > >
> > > > > +1, binding
> > > > >
> > > > > Jark Wu  于2023年6月7日周三 14:44写道:
> > > > >
> > > > > > +1 (binding)
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > > > 2023年6月7日 14:20,liu ron  写道:
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > Thanks for all the feedback about FLIP-315: Support Operator
> > Fusion
> > > > > > Codegen
> > > > > > > for Flink SQL[1].
> > > > > > > [2] is the discussion thread.
> > > > > > >
> > > > > > > I'd like to start a vote for it. The vote will be open for at
> > least
> > > > 72
> > > > > > > hours (until June 12th, 12:00AM GMT) unless there is an
> objection
> > > or
> > > > an
> > > > > > > insufficient number of votes.
> > > > > > >
> > > > > > > [1]:
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-315+Support+Operator+Fusion+Codegen+for+Flink+SQL
> > > > > > > [2]:
> > > > https://lists.apache.org/thread/9cnqhsld4nzdr77s2fwf00o9cb2g9fmw
> > > > > > >
> > > > > > > Best,
> > > > > > > Ron
> > > > > >
> > > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > >
> > >
> >
>