[jira] [Created] (FLINK-27712) Job failed to start due to "Time should be non negative"

2022-05-19 Thread Sharon Xie (Jira)
Sharon Xie created FLINK-27712:
--

 Summary: Job failed to start due to "Time should be non negative"
 Key: FLINK-27712
 URL: https://issues.apache.org/jira/browse/FLINK-27712
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.4
Reporter: Sharon Xie
 Attachments: flink_error.log.txt

Happened intermittently. A restart made the issue go away.

Stack trace:
Time should be non negative
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at 
org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44)
at 
org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:792)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$scheduleBufferDebloater$4(StreamTask.java:784)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)


 

JobManager error log is attached.

 

Maybe related to Flink-25465



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27711) Align setTopicPattern for Pulsar Connector

2022-05-19 Thread LuNng Wang (Jira)
LuNng Wang created FLINK-27711:
--

 Summary: Align setTopicPattern for Pulsar Connector
 Key: FLINK-27711
 URL: https://issues.apache.org/jira/browse/FLINK-27711
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, API / Python
Affects Versions: 1.15.0
Reporter: LuNng Wang


Update set_topics_pattern to set_topic_pattern



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Aitozi
Hi Xintong
Thanks for your detailed explanation, I misunderstand the spill
behavior at first glance,
I get your point now. I think it will be a good addition to the current
execution mode.
Looking forward to it :)

Best,
Aitozi

Xintong Song  于2022年5月20日周五 12:26写道:

> Hi Aitozi,
>
> In which case we can use the hybrid shuffle mode
>
> Just to directly answer this question, in addition to
> Weijie's explanations. For batch workload, if you want the workload to take
> advantage of as many resources as available, which ranges from a single
> slot to as many slots as the total tasks, you may consider hybrid shuffle
> mode. Admittedly, this may not always be wanted, e.g., users may not want
> to execute a job if there's too few resources available, or may not want a
> job taking too many of the cluster resources. That's why we propose hybrid
> shuffle as an additional option for batch users, rather than a replacement
> for Pipelined or Blocking mode.
>
> So you mean the hybrid shuffle mode will limit its usage to the bounded
> > source, Right ?
> >
> Yes.
>
> One more question, with the bounded data and partly of the stage is running
> > in the Pipelined shuffle mode, what will be the behavior of the task
> > failure, Is the checkpoint enabled for these running stages or will it
> > re-run after the failure?
> >
> There's no checkpoints. The failover behavior depends on the spilling
> strategy.
> - In the first version, we only consider a selective spilling strategy,
> which means spill data as little as possible to the disk, which means in
> case of failover upstream tasks need to be restarted to reproduce the
> complete intermediate results.
> - An alternative strategy we may introduce in future if needed is to spill
> the complete intermediate results. That avoids restarting upstream tasks in
> case of failover, because the produced intermediate results can be
> re-consumed, at the cost of more disk IO load.
> With both strategies, the trade-off between failover cost and IO load is
> for the user to decide. This is also discussed in the MemoryDataManager
> section of the FLIP.
>
> Best,
>
> Xintong
>
>
>
> On Fri, May 20, 2022 at 12:10 PM Aitozi  wrote:
>
> > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> > limit
> > its usage to the bounded source, Right ?
> > One more question, with the bounded data and partly of the stage is
> running
> > in the Pipelined shuffle mode, what will be the behavior of the task
> > failure, Is the
> > checkpoint enabled for these running stages or will it re-run after the
> > failure?
> >
> > Best,
> > Aitozi
> >
> > weijie guo  于2022年5月20日周五 10:45写道:
> >
> > > Hi, Aitozi:
> > >
> > > Thank you for the feedback!
> > > Here are some of my thoughts on your question
> > >
> > > >>> 1.If there is an unbounded data source, but only have resource to
> > > schedule the first stage, will it bring the big burden to the
> > disk/shuffle
> > > service which will occupy all the resource I think.
> > > First of all, Hybrid Shuffle Mode is oriented to the batch job
> scenario,
> > so
> > > there is no problem of unbounded data sources. Secondly, if you
> consider
> > > the stream scenario, I think Pipelined Shuffle should still be the best
> > > choice at present. For an unbounded data stream, it is not meaningful
> to
> > > only run some stages.
> > >
> > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> > > other words, In which case we can use the hybrid shuffle mode:
> > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > > shuffle mode can effectively utilize cluster resources and avoid some
> > > unnecessary disk IO overhead. For OLAP scenarios, which are
> characterized
> > > by a large number of concurrently submitted short batch jobs, hybrid
> > > shuffle can solve the scheduling deadlock problem of pipelined shuffle
> > and
> > > achieve similar performance.
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Aitozi  于2022年5月20日周五 08:05写道:
> > >
> > > > Hi Weijie:
> > > >
> > > >  Thanks for the nice FLIP, I have couple questions about this:
> > > >
> > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > > resource.
> > > > If there
> > > > is an unbounded data source, but only have resource to schedule the
> > first
> > > > stage, will it
> > > > bring the big burden to the disk/shuffle service which will occupy
> all
> > > the
> > > > resource I think.
> > > >
> > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> > other
> > > > words, In which
> > > > case we can use the hybrid shuffle mode:
> > > > - For batch job want to use more resource to reduce the e2e time ?
> > > > - Or for streaming job which may lack of resource temporarily ?
> > > > - Or for OLAP job which will try to make best use of available
> > resources
> > > as
> > > > you mentioned to finish the query?
> > > > Just want to know the typical use case for the Hybrid shuffle 

Re: [DISCUSS] FLIP-224: Blacklist Mechanism

2022-05-19 Thread Lijie Wang
Hi everyone,

I have started a vote for this FLIP [1]. Please cast your vote there or ask
additional questions here. [1]
https://lists.apache.org/thread/3416vks1j35co9608gkmsoplvcjjz7bg

Best, Lijie

Lijie Wang  于2022年5月19日周四 17:34写道:

> Hi Konstantin,
>
> We found that Flink REST URL does not support the format ":merge" , which
> will be recognized as a parameter in the URL(due to start with a colon).
>
> We will keep the previous way, i.e.
>
> POST: http://{jm_rest_address:port}/blocklist/taskmanagers
> and the "id" and "merge" flag are put into the request body.
>
> Best,
> Lijie
>
> Lijie Wang  于2022年5月18日周三 09:35写道:
>
>> Hi Weihua,
>> thanks for feedback.
>>
>> 1. Yes, only *Manually* is supported in this FLIP, but it's the first
>> step towards auto-detection.
>> 2. We wii print the blocked nodes in logs. Maybe also put it into the
>> exception of insufficient resources.
>> 3. No. This FLIP won't change the WebUI. The blocklist information can be
>> obtained through REST API and metrics.
>>
>> Best,
>> Lijie
>>
>> Weihua Hu  于2022年5月17日周二 21:41写道:
>>
>>> Hi,
>>> Thanks for creating this FLIP.
>>> We have implemented an automatic blocklist detection mechanism
>>> internally, which is indeed very effective for handling node failures.
>>> Due to the large number of nodes, although SREs already support
>>> automatic offline failure nodes, the detection is not 100% accurate and
>>> there is some delay.
>>> So the blocklist mechanism can make flink job recover from failure much
>>> faster.
>>>
>>> Here are some of my thoughts:
>>> 1. In this FLIP, it needs users to locate machine failure manually,
>>> there is a certain cost of use
>>> 2. What happens if too many nodes are blocked, resulting in insufficient
>>> resources? Will there be a special Exception for the user?
>>> 3. Will we display the blocklist information in the WebUI? The blocklist
>>> is for cluster level, and if multiple users share a cluster, some users may
>>> be a little confused when resources are not enough, or when resources are
>>> applied for more.
>>>
>>> Also, Looking forward to the next FLIP on auto-detection.
>>>
>>> Best,
>>> Weihua
>>>
>>> > 2022年5月16日 下午11:22,Lijie Wang  写道:
>>> >
>>> > Hi Konstantin,
>>> >
>>> > Maybe change it to the following:
>>> >
>>> > 1. POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}
>>> > Merge is not allowed. If the {id} already exists, return error.
>>> Otherwise,
>>> > create a new item.
>>> >
>>> > 2. POST: http://
>>> {jm_rest_address:port}/blocklist/taskmanagers/{id}:merge
>>> > Merge is allowed. If the {id} already exists, merge. Otherwise, create
>>> a
>>> > new item.
>>> >
>>> > WDYT?
>>> >
>>> > Best,
>>> > Lijie
>>> >
>>> > Konstantin Knauf  于2022年5月16日周一 20:07写道:
>>> >
>>> >> Hi Lijie,
>>> >>
>>> >> hm, maybe the following is more appropriate in that case
>>> >>
>>> >> POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge
>>> >>
>>> >> Best,
>>> >>
>>> >> Konstantin
>>> >>
>>> >> Am Mo., 16. Mai 2022 um 07:05 Uhr schrieb Lijie Wang <
>>> >> wangdachui9...@gmail.com>:
>>> >>
>>> >>> Hi Konstantin,
>>> >>> thanks for your feedback.
>>> >>>
>>> >>> From what I understand, PUT should be idempotent. However, we have a
>>> >>> *timeout* field in the request. This means that initiating the same
>>> >> request
>>> >>> at two different times will lead to different resource status
>>> (timestamps
>>> >>> of the items to be removed will be different).
>>> >>>
>>> >>> Should we use PUT in this case? WDYT?
>>> >>>
>>> >>> Best,
>>> >>> Lijie
>>> >>>
>>> >>> Konstantin Knauf  于2022年5月13日周五 17:20写道:
>>> >>>
>>>  Hi Lijie,
>>> 
>>>  wouldn't the REST API-idiomatic way for an update/replace be a PUT
>>> on
>>> >> the
>>>  resource?
>>> 
>>>  PUT: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}
>>> 
>>>  Best,
>>> 
>>>  Konstantin
>>> 
>>> 
>>> 
>>>  Am Fr., 13. Mai 2022 um 11:01 Uhr schrieb Lijie Wang <
>>>  wangdachui9...@gmail.com>:
>>> 
>>> > Hi everyone,
>>> >
>>> > I've had an offline discussion with Becket Qin and Zhu Zhu, and
>>> made
>>> >>> the
>>> > following changes on REST API:
>>> > 1. To avoid ambiguity, *timeout* and *endTimestamp* can only choose
>>> >>> one.
>>>  If
>>> > both are specified, will return error.
>>> > 2.  If the specified item is already there, the *ADD* operation has
>>> >> two
>>> > behaviors:  *return error*(default value) or *merge/update*, and we
>>> >>> add a
>>> > flag to the request body to control it. You can find more details
>>> >>> "Public
>>> > Interface" section.
>>> >
>>> > If there is no more feedback, we will start the vote thread next
>>> >> week.
>>> >
>>> > Best,
>>> > Lijie
>>> >
>>> > Lijie Wang  于2022年5月10日周二 17:14写道:
>>> >
>>> >> Hi Becket Qin,
>>> >>
>>> >> Thanks for your suggestions.  I have moved the description of
>>> >> configurations, metrics and 

[VOTE] FLIP-224: Blocklist Mechanism

2022-05-19 Thread Lijie Wang
Hi everyone,

Thanks for the feedback for FLIP-224: Blocklist Mechanism [1] on the
discussion thread [2]

I'd like to start a vote for it. The vote will last for at least 72 hours
unless there is an objection or insufficient votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism
[2] https://lists.apache.org/thread/fngkk52kjbc6b6v9nn0lkfq6hhsbgb1h

Best,
Lijie


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Xintong Song
Hi Aitozi,

In which case we can use the hybrid shuffle mode

Just to directly answer this question, in addition to
Weijie's explanations. For batch workload, if you want the workload to take
advantage of as many resources as available, which ranges from a single
slot to as many slots as the total tasks, you may consider hybrid shuffle
mode. Admittedly, this may not always be wanted, e.g., users may not want
to execute a job if there's too few resources available, or may not want a
job taking too many of the cluster resources. That's why we propose hybrid
shuffle as an additional option for batch users, rather than a replacement
for Pipelined or Blocking mode.

So you mean the hybrid shuffle mode will limit its usage to the bounded
> source, Right ?
>
Yes.

One more question, with the bounded data and partly of the stage is running
> in the Pipelined shuffle mode, what will be the behavior of the task
> failure, Is the checkpoint enabled for these running stages or will it
> re-run after the failure?
>
There's no checkpoints. The failover behavior depends on the spilling
strategy.
- In the first version, we only consider a selective spilling strategy,
which means spill data as little as possible to the disk, which means in
case of failover upstream tasks need to be restarted to reproduce the
complete intermediate results.
- An alternative strategy we may introduce in future if needed is to spill
the complete intermediate results. That avoids restarting upstream tasks in
case of failover, because the produced intermediate results can be
re-consumed, at the cost of more disk IO load.
With both strategies, the trade-off between failover cost and IO load is
for the user to decide. This is also discussed in the MemoryDataManager
section of the FLIP.

Best,

Xintong



On Fri, May 20, 2022 at 12:10 PM Aitozi  wrote:

> Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> limit
> its usage to the bounded source, Right ?
> One more question, with the bounded data and partly of the stage is running
> in the Pipelined shuffle mode, what will be the behavior of the task
> failure, Is the
> checkpoint enabled for these running stages or will it re-run after the
> failure?
>
> Best,
> Aitozi
>
> weijie guo  于2022年5月20日周五 10:45写道:
>
> > Hi, Aitozi:
> >
> > Thank you for the feedback!
> > Here are some of my thoughts on your question
> >
> > >>> 1.If there is an unbounded data source, but only have resource to
> > schedule the first stage, will it bring the big burden to the
> disk/shuffle
> > service which will occupy all the resource I think.
> > First of all, Hybrid Shuffle Mode is oriented to the batch job scenario,
> so
> > there is no problem of unbounded data sources. Secondly, if you consider
> > the stream scenario, I think Pipelined Shuffle should still be the best
> > choice at present. For an unbounded data stream, it is not meaningful to
> > only run some stages.
> >
> > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> > other words, In which case we can use the hybrid shuffle mode:
> > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > shuffle mode can effectively utilize cluster resources and avoid some
> > unnecessary disk IO overhead. For OLAP scenarios, which are characterized
> > by a large number of concurrently submitted short batch jobs, hybrid
> > shuffle can solve the scheduling deadlock problem of pipelined shuffle
> and
> > achieve similar performance.
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Aitozi  于2022年5月20日周五 08:05写道:
> >
> > > Hi Weijie:
> > >
> > >  Thanks for the nice FLIP, I have couple questions about this:
> > >
> > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > resource.
> > > If there
> > > is an unbounded data source, but only have resource to schedule the
> first
> > > stage, will it
> > > bring the big burden to the disk/shuffle service which will occupy all
> > the
> > > resource I think.
> > >
> > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> other
> > > words, In which
> > > case we can use the hybrid shuffle mode:
> > > - For batch job want to use more resource to reduce the e2e time ?
> > > - Or for streaming job which may lack of resource temporarily ?
> > > - Or for OLAP job which will try to make best use of available
> resources
> > as
> > > you mentioned to finish the query?
> > > Just want to know the typical use case for the Hybrid shuffle mode :)
> > >
> > >
> > > Best,
> > > Aitozi.
> > >
> > > weijie guo  于2022年5月19日周四 18:33写道:
> > >
> > > > Yangze, Thank you for the feedback!
> > > > Here's my thoughts for your questions:
> > > >
> > > > >>> How do we decide the size of the buffer pool in MemoryDataManager
> > and
> > > > the read buffers in FileDataManager?
> > > > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > > > ResultPartition, and the size is the same as the current
> implementation
> > > of
> > > > 

[jira] [Created] (FLINK-27710) Improve logs to better display Execution

2022-05-19 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-27710:
---

 Summary: Improve logs to better display Execution
 Key: FLINK-27710
 URL: https://issues.apache.org/jira/browse/FLINK-27710
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Task
Affects Versions: 1.16.0
Reporter: Zhu Zhu
 Fix For: 1.16.0


Currently, an execution is usually represented as "{{{}job vertex name{}}} 
({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}attemptId{}}})" in 
logs, which may be redundant after this refactoring work. With the change of 
FLINK-17295, the representation of Execution in logs will be redundant. e.g. 
the subtask index is displayed 2 times.

Therefore, I'm proposing to change the format to be "{{{}job vertex name{}}} 
({{{}short ExecutionGraphID{}}}:{{{}JobVertexID{}}}) 
({{{}subtaskIndex+1{}}}/{{{}vertex parallelism{}}}) ({{{}#attemptNumber{}}})" 
and avoid directly display the {{{}ExecutionAttemptID{}}}. This can increase 
the log readability.

Besides that, the displayed {{JobVertexID}} can also help to distinguish job 
vertices of the same name, which is common in DataStream jobs (e.g. multiple 
{{{}Map{}}}).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Aitozi
Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
limit
its usage to the bounded source, Right ?
One more question, with the bounded data and partly of the stage is running
in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the
checkpoint enabled for these running stages or will it re-run after the
failure?

Best,
Aitozi

weijie guo  于2022年5月20日周五 10:45写道:

> Hi, Aitozi:
>
> Thank you for the feedback!
> Here are some of my thoughts on your question
>
> >>> 1.If there is an unbounded data source, but only have resource to
> schedule the first stage, will it bring the big burden to the disk/shuffle
> service which will occupy all the resource I think.
> First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so
> there is no problem of unbounded data sources. Secondly, if you consider
> the stream scenario, I think Pipelined Shuffle should still be the best
> choice at present. For an unbounded data stream, it is not meaningful to
> only run some stages.
>
> >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> other words, In which case we can use the hybrid shuffle mode:
> Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> shuffle mode can effectively utilize cluster resources and avoid some
> unnecessary disk IO overhead. For OLAP scenarios, which are characterized
> by a large number of concurrently submitted short batch jobs, hybrid
> shuffle can solve the scheduling deadlock problem of pipelined shuffle and
> achieve similar performance.
>
> Best regards,
>
> Weijie
>
>
> Aitozi  于2022年5月20日周五 08:05写道:
>
> > Hi Weijie:
> >
> >  Thanks for the nice FLIP, I have couple questions about this:
> >
> > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> resource.
> > If there
> > is an unbounded data source, but only have resource to schedule the first
> > stage, will it
> > bring the big burden to the disk/shuffle service which will occupy all
> the
> > resource I think.
> >
> > 2) Which kind of job will benefit from the hybrid shuffle mode. In other
> > words, In which
> > case we can use the hybrid shuffle mode:
> > - For batch job want to use more resource to reduce the e2e time ?
> > - Or for streaming job which may lack of resource temporarily ?
> > - Or for OLAP job which will try to make best use of available resources
> as
> > you mentioned to finish the query?
> > Just want to know the typical use case for the Hybrid shuffle mode :)
> >
> >
> > Best,
> > Aitozi.
> >
> > weijie guo  于2022年5月19日周四 18:33写道:
> >
> > > Yangze, Thank you for the feedback!
> > > Here's my thoughts for your questions:
> > >
> > > >>> How do we decide the size of the buffer pool in MemoryDataManager
> and
> > > the read buffers in FileDataManager?
> > > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > > ResultPartition, and the size is the same as the current implementation
> > of
> > > sort-merge shuffle. In other words, the minimum value of BufferPool is
> a
> > > configurable fixed value, and the maximum value is Math.max(min, 4 *
> > > numSubpartitions). The default value can be determined by running the
> > > TPC-DS tests.
> > > Read buffers in FileDataManager are requested from the
> > > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled
> by
> > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> > > value is 32M, which is consistent with the current sort-merge shuffle
> > > logic.
> > >
> > > >>> Is there an upper limit for the sum of them? If there is, how does
> > > MemoryDataManager and FileDataManager sync the memory usage?
> > > The buffers of the MemoryDataManager are limited by the size of the
> > > LocalBufferPool, and the upper limit is the size of the Network Memory.
> > The
> > > buffers of the FileDataManager are directly requested from
> > > UnpooledOffHeapMemory, and are also limited by the size of the
> framework
> > > off-heap memory. I think there should be no need for additional
> > > synchronization mechanisms.
> > >
> > > >>> How do you disable the slot sharing? If user configures both the
> slot
> > > sharing group and hybrid shuffle, what will happen to that job?
> > > I think we can print a warning log when Hybrid Shuffle is enabled and
> SSG
> > > is configured during the JobGraph compilation stage, and fallback to
> the
> > > region slot sharing group by default. Of course, it will be emphasized
> in
> > > the document that we do not currently support SSG, If configured, it
> will
> > > fall back to the default.
> > >
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Yangze Guo  于2022年5月19日周四 16:25写道:
> > >
> > > > Thanks for driving this. Xintong and Weijie.
> > > >
> > > > I believe this feature will make Flink a better batch/OLAP engine. +1
> > > > for the overall design.
> > > >
> > > > Some questions:
> > > > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > 

Re: [DISCUSS] FLIP-231: Introduce SupportStatisticReport to support reporting statistics from source connectors

2022-05-19 Thread Jark Wu
Hi Godfrey,

I just left some comments here:

1) SupportStatisticReport => SupportsStatisticReport
All the ability interfaces begin with "Supports" instead of "Support".

2) table.optimizer.source.connect-statistics-enabled
The "connect" word should be "collect"?

3) CatalogStatistics
I was a little confused when I first saw the name. I thought it reports
stats for a catalog...
Why not use "TableStats" which already wraps "ColumnStats" in it and is a
public API as well?

4) source.statistics-type
vs table.optimizer.source.collect-statistics-enabled
What's the difference between them? It seems that they are both used to
enable or disable reporting stats.

5) "Which connectors and formats will be supported by default?"
IMO, we should also support Hive source as well in this FLIP.
Hive source is more widely used than Filesystem connector.

Best,
Jark




On Tue, 17 May 2022 at 10:52, Jingsong Li  wrote:

> Hi Godfrey,
>
> Thanks for your reply.
>
> Sounds good to me.
>
> > I think we should also introduce a config option
>
> We can add this option to the FLIP. I prefer a option for
> FileSystemConnector, maybe a enum.
>
> Best,
> Jingsong
>
> On Tue, May 17, 2022 at 10:31 AM godfrey he  wrote:
>
> > Hi Jingsong,
> >
> > Thanks for the feedback.
> >
> >
> > >One concern I have is that we read the footer for each file, and this
> may
> > >be a bit costly in some cases. Is it possible for us to have some
> > > hierarchical way
> > yes, if there are thousands of orc/parquet files, it may take a long
> time.
> > So we can introduce a config option to let the user choose the
> > granularity of the statistics.
> > But the SIZE will not be introduced, because the planner does not use
> > the file size statistics now.
> > We can introduce once file size statistics is introduce in the future.
> > I think we should also introduce a config option to enable/disable
> > SupportStatisticReport,
> > because it's a heavy operation for some connectors in some cases.
> >
> > > is the filter pushdown already happening at
> > > this time?
> > That's a good point. Currently, the filter push down is after partition
> > pruning
> > to prevent the filter push down rule from consuming the partition
> > predicates.
> > The statistics will be set to unknown if filter is pushed down now.
> > To combine them all, we can create an optimization program after filter
> > push
> > down program to collect the statistics. This could avoid collecting
> > statistics multiple times.
> >
> >
> > Best,
> > Godfrey
> >
> > Jingsong Li  于2022年5月13日周五 22:44写道:
> > >
> > > Thank Godfrey for driving.
> > >
> > > Looks very good~ This will undoubtedly greatly enhance the various
> batch
> > > mode connectors.
> > >
> > > I left some comments:
> > >
> > > ## FileBasedStatisticsReportableDecodingFormat
> > >
> > > One concern I have is that we read the footer for each file, and this
> may
> > > be a bit costly in some cases. Is it possible for us to have some
> > > hierarchical way, e.g.
> > > - No statistics are collected for files by default.
> > > - SIZE: Generate statistics based on file Size, get the size of the
> file
> > > only with access to the master of the FileSystem.
> > > - DETAILED: Get the complete statistics by format, possibly by
> accessing
> > > the footer of the file.
> > >
> > > ## When use the statistics reported by connector
> > >
> > > > When partitions are pruned by PushPartitionIntoTableSourceScanRule,
> the
> > > statistics should also be updated.
> > >
> > > I understand that we definitely need to use reporter after the
> partition
> > > prune, but another question: is the filter pushdown already happening
> at
> > > this time?
> > > Can we make sure that in the following three cases, both the filter
> > > pushdown and the partition prune happen before the stats reporting.
> > > - only partition prune happens
> > > - only filter pushdown happens
> > > - both filter pushdown and partition prune happen
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Fri, May 13, 2022 at 6:57 PM godfrey he 
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to open a discussion on FLIP-231:  Introduce
> > > > SupportStatisticReport
> > > > to support reporting statistics from source connectors.
> > > >
> > > > Statistics are one of the most important inputs to the optimizer.
> > > > Accurate and complete statistics allows the optimizer to be more
> > powerful.
> > > > Currently, the statistics of Flink SQL come from Catalog only,
> > > > while many Connectors have the ability to provide statistics, e.g.
> > > > FileSystem.
> > > > In production, we find many tables in Catalog do not have any
> > statistics.
> > > > As a result, the optimizer can't generate better execution plans,
> > > > especially for Batch jobs.
> > > >
> > > > There are two approaches to enhance statistics for the planner,
> > > > one is to introduce the "ANALYZE TABLE" syntax which will write
> > > > the analyzed result to the catalog, another is to introduce a new
> > 

[jira] [Created] (FLINK-27709) Add comment to schema

2022-05-19 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-27709:


 Summary: Add comment to schema
 Key: FLINK-27709
 URL: https://issues.apache.org/jira/browse/FLINK-27709
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: table-store-0.2.0


We can add comment to schema for table.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27708) Add background compaction task for append-only table when ingesting.

2022-05-19 Thread Zheng Hu (Jira)
Zheng Hu created FLINK-27708:


 Summary: Add background compaction task for append-only table when 
ingesting.
 Key: FLINK-27708
 URL: https://issues.apache.org/jira/browse/FLINK-27708
 Project: Flink
  Issue Type: Sub-task
Reporter: Zheng Hu


We could still execute compaction task to merge small files in the background 
for append-only table, although it won't reduce any delete markers when 
comparing to the merge tree table.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-222: Support full query lifecycle statements in SQL client

2022-05-19 Thread Paul Lam
Hi Jark,

WRT “DROP QUERY”, I agree that it’s not very intuitive, and that’s
part of the reason why I proposed “STOP/CANCEL QUERY” at the
beginning. The downside of it is that it’s not ANSI-SQL compatible.

Another question is, what should be the syntax for ungracefully 
canceling a query? As ShengKai pointed out in a offline discussion, 
“STOP QUERY” and “CANCEL QUERY” might confuse SQL users.
Flink CLI has both stop and cancel, mostly due to historical problems.

WRT “SHOW SAVEPOINT”, I agree it’s a missing part. My concern is
that savepoints are owned by users and beyond the lifecycle of a Flink
cluster. For example, a user might take a savepoint at a custom path
that’s different than the default savepoint path, I think jobmanager would
not remember that, not to mention the jobmanager may be a fresh new
one after a cluster restart. Thus if we support “SHOW SAVEPOINT”, it's 
probably a best-effort one.

WRT savepoint syntax, I’m thinking of the semantic of the savepoint.
Savepoints are alias for nested transactions in DB area[1], and there’s
correspondingly global transactions. If we consider Flink jobs as 
global transactions and Flink checkpoints as nested transactions,
then the savepoint semantics are close, thus I think savepoint syntax 
in SQL-standard could be considered. But again, I’m don’t have very
strong preference.

Ping @Timo to get more inputs.

[1] https://en.wikipedia.org/wiki/Nested_transaction 


Best,
Paul Lam

> 2022年5月18日 17:48,Jark Wu  写道:
> 
> Hi Paul,
> 
> 1) SHOW QUERIES
> +1 to add finished time, but it would be better to call it "end_time" to
> keep aligned with names in Web UI.
> 
> 2) DROP QUERY
> I think we shouldn't throw exceptions for batch jobs, otherwise, how to
> stop batch queries?
> At present, I don't think "DROP" is a suitable keyword for this statement.
> From the perspective of users, "DROP" sounds like the query should be
> removed from the
> list of "SHOW QUERIES". However, it doesn't. Maybe "STOP QUERY" is more
> suitable and
> compliant with commands of Flink CLI.
> 
> 3) SHOW SAVEPOINTS
> I think this statement is needed, otherwise, savepoints are lost after the
> SAVEPOINT
> command is executed. Savepoints can be retrieved from REST API
> "/jobs/:jobid/checkpoints"
> with filtering "checkpoint_type"="savepoint". It's also worth considering
> providing "SHOW CHECKPOINTS"
> to list all checkpoints.
> 
> 4) SAVEPOINT & RELEASE SAVEPOINT
> I'm a little concerned with the SAVEPOINT and RELEASE SAVEPOINT statements
> now.
> In the vendors, the parameters of SAVEPOINT and RELEASE SAVEPOINT are both
> the same savepoint id.
> However, in our syntax, the first one is query id, and the second one is
> savepoint path, which is confusing and
> not consistent. When I came across SHOW SAVEPOINT, I thought maybe they
> should be in the same syntax set.
> For example, CREATE SAVEPOINT FOR [QUERY]  & DROP SAVEPOINT
> .
> That means we don't follow the majority of vendors in SAVEPOINT commands. I
> would say the purpose is different in Flink.
> What other's opinion on this?
> 
> Best,
> Jark
> 
> [1]:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-checkpoints
> 
> 
> On Wed, 18 May 2022 at 14:43, Paul Lam  wrote:
> 
>> Hi Godfrey,
>> 
>> Thanks a lot for your inputs!
>> 
>> 'SHOW QUERIES' lists all jobs in the cluster, no limit on APIs (DataStream
>> or SQL) or
>> clients (SQL client or CLI). Under the hook, it’s based on
>> ClusterClient#listJobs, the
>> same with Flink CLI. I think it’s okay to have non-SQL jobs listed in SQL
>> client, because
>> these jobs can be managed via SQL client too.
>> 
>> WRT finished time, I think you’re right. Adding it to the FLIP. But I’m a
>> bit afraid that the
>> rows would be too long.
>> 
>> WRT ‘DROP QUERY’,
>>> What's the behavior for batch jobs and the non-running jobs?
>> 
>> 
>> In general, the behavior would be aligned with Flink CLI. Triggering a
>> savepoint for
>> a non-running job would cause errors, and the error message would be
>> printed to
>> the SQL client. Triggering a savepoint for batch(unbounded) jobs in
>> streaming
>> execution mode would be the same with streaming jobs. However, for batch
>> jobs in
>> batch execution mode, I think there would be an error, because batch
>> execution
>> doesn’t support checkpoints currently (please correct me if I’m wrong).
>> 
>> WRT ’SHOW SAVEPOINTS’, I’ve thought about it, but Flink clusterClient/
>> jobClient doesn’t have such a functionality at the moment, neither do
>> Flink CLI.
>> Maybe we could make it a follow-up FLIP, which includes the modifications
>> to
>> clusterClient/jobClient and Flink CLI. WDYT?
>> 
>> Best,
>> Paul Lam
>> 
>>> 2022年5月17日 20:34,godfrey he  写道:
>>> 
>>> Godfrey
>> 
>> 



Re: [VOTE] FLIP-91: Support SQL Gateway

2022-05-19 Thread Paul Lam
+1 (non-binding)

Best,
Paul Lam

> 2022年5月20日 10:48,Jark Wu  写道:
> 
> +1 (binding)
> 
> Best,
> Jark
> 
> On Fri, 20 May 2022 at 10:39, Shengkai Fang  wrote:
> 
>> Hi, everyone.
>> 
>> Thanks for your feedback for FLIP-91: Support SQL Gateway[1] on the
>> discussion thread[2]. I'd like to start a vote for it. The vote will be
>> open for at least 72 hours unless there is an objection or not enough
>> votes.
>> 
>> Best,
>> Shengkai
>> 
>> [1]
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Gateway
>> [2]https://lists.apache.org/thread/gr7soo29z884r1scnz77r2hwr2xmd9b0
>> 



[jira] [Created] (FLINK-27707) Implement TableStoreFactory#onCompactTable

2022-05-19 Thread Jane Chan (Jira)
Jane Chan created FLINK-27707:
-

 Summary: Implement TableStoreFactory#onCompactTable
 Key: FLINK-27707
 URL: https://issues.apache.org/jira/browse/FLINK-27707
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.2.0
Reporter: Jane Chan
 Fix For: table-store-0.2.0


Perform scan and pick data files to compact



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27706) Refactor all subclasses of FileStoreTableITCase to use the batchSql.

2022-05-19 Thread Zheng Hu (Jira)
Zheng Hu created FLINK-27706:


 Summary: Refactor all subclasses of FileStoreTableITCase to use 
the batchSql.
 Key: FLINK-27706
 URL: https://issues.apache.org/jira/browse/FLINK-27706
 Project: Flink
  Issue Type: Sub-task
Reporter: Zheng Hu


Since we've introduced a batchSql to execute batch query for flink in 
FileStoreTableITCase.  Then all the subclasses can just use batch sql to submit 
the flink sql.

It's a minor issue.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Biao Geng
Hi there,
@Zain, Weihua's suggestion should be able to fulfill the request to check
JM logs. If you do want to use YARN cli for running Flink applications, it
is possible to check JM's log with the YARN command like:
*yarn logs -applicationId application_xxx_yyy -am -1 -logFiles
jobmanager.log*
For TM log, command would be like:
* yarn logs -applicationId  -containerId   -logFiles
taskmanager.log*
Note, it is not super easy to find the container id of TM. Some workaround
would be to check JM's log first and get the container id for TM from that.
You can also learn more about the details of above commands from *yarn logs
-help*

@Shengkai, yes, you are right the actual JM address is managed by YARN. To
access the JM launched by YARN, users need to access YARN web ui to find
the YARN application by applicationId and then click 'application master
url' of that application to be redirected to Flink web ui.

Best,
Biao Geng

Shengkai Fang  于2022年5月20日周五 10:59写道:

> Hi.
>
> I am not familiar with the YARN application mode. Because the job manager
> is started when submit the jobs. So how can users know the address of the
> JM? Do we need to look up the Yarn UI to search the submitted job with the
> JobID?
>
> Best,
> Shengkai
>
> Weihua Hu  于2022年5月20日周五 10:23写道:
>
>> Hi,
>> You can get the logs from Flink Web UI if job is running.
>> Best,
>> Weihua
>>
>> 2022年5月19日 下午10:56,Zain Haider Nemati  写道:
>>
>> Hey All,
>> How can I check logs for my job when it is running in application mode
>> via yarn
>>
>>
>>


Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Shengkai Fang
Hi.

I am not familiar with the YARN application mode. Because the job manager
is started when submit the jobs. So how can users know the address of the
JM? Do we need to look up the Yarn UI to search the submitted job with the
JobID?

Best,
Shengkai

Weihua Hu  于2022年5月20日周五 10:23写道:

> Hi,
> You can get the logs from Flink Web UI if job is running.
> Best,
> Weihua
>
> 2022年5月19日 下午10:56,Zain Haider Nemati  写道:
>
> Hey All,
> How can I check logs for my job when it is running in application mode via
> yarn
>
>
>


Re: [VOTE] FLIP-91: Support SQL Gateway

2022-05-19 Thread Jark Wu
+1 (binding)

Best,
Jark

On Fri, 20 May 2022 at 10:39, Shengkai Fang  wrote:

> Hi, everyone.
>
> Thanks for your feedback for FLIP-91: Support SQL Gateway[1] on the
> discussion thread[2]. I'd like to start a vote for it. The vote will be
> open for at least 72 hours unless there is an objection or not enough
> votes.
>
> Best,
> Shengkai
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Gateway
> [2]https://lists.apache.org/thread/gr7soo29z884r1scnz77r2hwr2xmd9b0
>


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread weijie guo
Hi, Aitozi:

Thank you for the feedback!
Here are some of my thoughts on your question

>>> 1.If there is an unbounded data source, but only have resource to
schedule the first stage, will it bring the big burden to the disk/shuffle
service which will occupy all the resource I think.
First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, so
there is no problem of unbounded data sources. Secondly, if you consider
the stream scenario, I think Pipelined Shuffle should still be the best
choice at present. For an unbounded data stream, it is not meaningful to
only run some stages.

>>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
other words, In which case we can use the hybrid shuffle mode:
Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
shuffle mode can effectively utilize cluster resources and avoid some
unnecessary disk IO overhead. For OLAP scenarios, which are characterized
by a large number of concurrently submitted short batch jobs, hybrid
shuffle can solve the scheduling deadlock problem of pipelined shuffle and
achieve similar performance.

Best regards,

Weijie


Aitozi  于2022年5月20日周五 08:05写道:

> Hi Weijie:
>
>  Thanks for the nice FLIP, I have couple questions about this:
>
> 1) In the hybrid shuffle mode, the shuffle mode is decided by the resource.
> If there
> is an unbounded data source, but only have resource to schedule the first
> stage, will it
> bring the big burden to the disk/shuffle service which will occupy all the
> resource I think.
>
> 2) Which kind of job will benefit from the hybrid shuffle mode. In other
> words, In which
> case we can use the hybrid shuffle mode:
> - For batch job want to use more resource to reduce the e2e time ?
> - Or for streaming job which may lack of resource temporarily ?
> - Or for OLAP job which will try to make best use of available resources as
> you mentioned to finish the query?
> Just want to know the typical use case for the Hybrid shuffle mode :)
>
>
> Best,
> Aitozi.
>
> weijie guo  于2022年5月19日周四 18:33写道:
>
> > Yangze, Thank you for the feedback!
> > Here's my thoughts for your questions:
> >
> > >>> How do we decide the size of the buffer pool in MemoryDataManager and
> > the read buffers in FileDataManager?
> > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > ResultPartition, and the size is the same as the current implementation
> of
> > sort-merge shuffle. In other words, the minimum value of BufferPool is a
> > configurable fixed value, and the maximum value is Math.max(min, 4 *
> > numSubpartitions). The default value can be determined by running the
> > TPC-DS tests.
> > Read buffers in FileDataManager are requested from the
> > BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
> > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> > value is 32M, which is consistent with the current sort-merge shuffle
> > logic.
> >
> > >>> Is there an upper limit for the sum of them? If there is, how does
> > MemoryDataManager and FileDataManager sync the memory usage?
> > The buffers of the MemoryDataManager are limited by the size of the
> > LocalBufferPool, and the upper limit is the size of the Network Memory.
> The
> > buffers of the FileDataManager are directly requested from
> > UnpooledOffHeapMemory, and are also limited by the size of the framework
> > off-heap memory. I think there should be no need for additional
> > synchronization mechanisms.
> >
> > >>> How do you disable the slot sharing? If user configures both the slot
> > sharing group and hybrid shuffle, what will happen to that job?
> > I think we can print a warning log when Hybrid Shuffle is enabled and SSG
> > is configured during the JobGraph compilation stage, and fallback to the
> > region slot sharing group by default. Of course, it will be emphasized in
> > the document that we do not currently support SSG, If configured, it will
> > fall back to the default.
> >
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Yangze Guo  于2022年5月19日周四 16:25写道:
> >
> > > Thanks for driving this. Xintong and Weijie.
> > >
> > > I believe this feature will make Flink a better batch/OLAP engine. +1
> > > for the overall design.
> > >
> > > Some questions:
> > > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > > and the read buffers in FileDataManager?
> > > 2. Is there an upper limit for the sum of them? If there is, how does
> > > MemoryDataManager and FileDataManager sync the memory usage?
> > > 3. How do you disable the slot sharing? If user configures both the
> > > slot sharing group and hybrid shuffle, what will happen to that job?
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Thu, May 19, 2022 at 2:41 PM Xintong Song 
> > > wrote:
> > > >
> > > > Thanks for preparing this FLIP, Weijie.
> > > >
> > > > I think this is a good improvement on batch resource elasticity.
> > Looking
> > > > forward to the community feedback.
> 

[VOTE] FLIP-91: Support SQL Gateway

2022-05-19 Thread Shengkai Fang
Hi, everyone.

Thanks for your feedback for FLIP-91: Support SQL Gateway[1] on the
discussion thread[2]. I'd like to start a vote for it. The vote will be
open for at least 72 hours unless there is an objection or not enough votes.

Best,
Shengkai

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Gateway
[2]https://lists.apache.org/thread/gr7soo29z884r1scnz77r2hwr2xmd9b0


[jira] [Created] (FLINK-27705) num-sorted-run.compaction-trigger should not interfere the num-levels

2022-05-19 Thread Jane Chan (Jira)
Jane Chan created FLINK-27705:
-

 Summary: num-sorted-run.compaction-trigger should not interfere 
the num-levels 
 Key: FLINK-27705
 URL: https://issues.apache.org/jira/browse/FLINK-27705
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.2.0
Reporter: Jane Chan
 Fix For: table-store-0.2.0


h3. Issue Description
The default value for MergeTreeOptions.NUM_LEVELS is not a constant, and it 
varies with MergeTreeOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER. If users do 
not specify the MergeTreeOptions.NUM_LEVELS at the first, once 
MergeTreeOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER is changed, and the 
successive write task would get a chance to fail (to be specific, when the 
compaction trigger size shrinks and the previous writes had triggered 
compaction).


h3. How to Reproduce

add a test under ForceCompactionITCase
{code:java}
@Override
protected List ddl() {
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T (\n"
+ "  f0 INT\n, "
+ "  f1 STRING\n, "
+ "  f2 STRING\n"
+ ") PARTITIONED BY (f1)",
"CREATE TABLE IF NOT EXISTS T1 (\n"
+ "  f0 INT\n, "
+ "  f1 STRING\n, "
+ "  f2 STRING\n"
+ ")");
}

@Test
public void test() throws Exception {
bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
bEnv.executeSql(
"INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming'),"
+ "(2, 'Winter', 'The First Snowflake'), "
+ "(2, 'Spring', 'The First Rose in Spring'), "
+ "(7, 'Summer', 'Summertime Sadness')")
.await();
bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last 
Christmas')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is 
Coming')").await();
bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')").await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
+ "(4, 'Spring', 'Spring Water')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
bEnv.executeSql(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
bEnv.executeSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = 
'2')");
bEnv.executeSql(
"INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'),"
+ " (9, 'Autumn', 'Wake Me Up When September 
Ends')")
.await();
} {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: Job Logs - Yarn Application Mode

2022-05-19 Thread Weihua Hu
Hi,
You can get the logs from Flink Web UI if job is running.
Best,
Weihua

> 2022年5月19日 下午10:56,Zain Haider Nemati  写道:
> 
> Hey All,
> How can I check logs for my job when it is running in application mode via 
> yarn



[RESULT] [VOTE] FLIP-229: Introduces Join Hint for Flink SQL Batch Job

2022-05-19 Thread Xuyang
Hi everyone,

FLIP-229 [1] has been accepted. 

There [2] were 6 binding votes in favor. None against. Votes are in the
order of arrival:
- Binding: Godfrey He- Binding: Jingsong Li- Binding: Jark Wu- Binding: Yun 
Tang- Binding: Leonard Xu- Binding: Timo Walther

Thanks again for every one who concerns on this FLIP.
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job
[2] https://lists.apache.org/thread/s70cjbbr5565m44f4mfqo9w7xdq09cf1



--

Best!
Xuyang

Job Logs - Yarn Application Mode

2022-05-19 Thread Zain Haider Nemati
Hey All,
How can I check logs for my job when it is running in application mode via
yarn


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Aitozi
Hi Weijie:

 Thanks for the nice FLIP, I have couple questions about this:

1) In the hybrid shuffle mode, the shuffle mode is decided by the resource.
If there
is an unbounded data source, but only have resource to schedule the first
stage, will it
bring the big burden to the disk/shuffle service which will occupy all the
resource I think.

2) Which kind of job will benefit from the hybrid shuffle mode. In other
words, In which
case we can use the hybrid shuffle mode:
- For batch job want to use more resource to reduce the e2e time ?
- Or for streaming job which may lack of resource temporarily ?
- Or for OLAP job which will try to make best use of available resources as
you mentioned to finish the query?
Just want to know the typical use case for the Hybrid shuffle mode :)


Best,
Aitozi.

weijie guo  于2022年5月19日周四 18:33写道:

> Yangze, Thank you for the feedback!
> Here's my thoughts for your questions:
>
> >>> How do we decide the size of the buffer pool in MemoryDataManager and
> the read buffers in FileDataManager?
> The BufferPool in MemoryDataManager is the LocalBufferPool used by
> ResultPartition, and the size is the same as the current implementation of
> sort-merge shuffle. In other words, the minimum value of BufferPool is a
> configurable fixed value, and the maximum value is Math.max(min, 4 *
> numSubpartitions). The default value can be determined by running the
> TPC-DS tests.
> Read buffers in FileDataManager are requested from the
> BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
> value is 32M, which is consistent with the current sort-merge shuffle
> logic.
>
> >>> Is there an upper limit for the sum of them? If there is, how does
> MemoryDataManager and FileDataManager sync the memory usage?
> The buffers of the MemoryDataManager are limited by the size of the
> LocalBufferPool, and the upper limit is the size of the Network Memory. The
> buffers of the FileDataManager are directly requested from
> UnpooledOffHeapMemory, and are also limited by the size of the framework
> off-heap memory. I think there should be no need for additional
> synchronization mechanisms.
>
> >>> How do you disable the slot sharing? If user configures both the slot
> sharing group and hybrid shuffle, what will happen to that job?
> I think we can print a warning log when Hybrid Shuffle is enabled and SSG
> is configured during the JobGraph compilation stage, and fallback to the
> region slot sharing group by default. Of course, it will be emphasized in
> the document that we do not currently support SSG, If configured, it will
> fall back to the default.
>
>
> Best regards,
>
> Weijie
>
>
> Yangze Guo  于2022年5月19日周四 16:25写道:
>
> > Thanks for driving this. Xintong and Weijie.
> >
> > I believe this feature will make Flink a better batch/OLAP engine. +1
> > for the overall design.
> >
> > Some questions:
> > 1. How do we decide the size of the buffer pool in MemoryDataManager
> > and the read buffers in FileDataManager?
> > 2. Is there an upper limit for the sum of them? If there is, how does
> > MemoryDataManager and FileDataManager sync the memory usage?
> > 3. How do you disable the slot sharing? If user configures both the
> > slot sharing group and hybrid shuffle, what will happen to that job?
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Thu, May 19, 2022 at 2:41 PM Xintong Song 
> > wrote:
> > >
> > > Thanks for preparing this FLIP, Weijie.
> > >
> > > I think this is a good improvement on batch resource elasticity.
> Looking
> > > forward to the community feedback.
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Thu, May 19, 2022 at 2:31 PM weijie guo 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > >
> > > > I’d like to start a discussion about FLIP-235[1], which introduce a
> > new shuffle mode
> > > >  can overcome some of the problems of Pipelined Shuffle and Blocking
> > Shuffle in batch scenarios.
> > > >
> > > >
> > > > Currently in Flink, task scheduling is more or less constrained by
> the
> > shuffle implementations.
> > > > This will bring the following disadvantages:
> > > >
> > > >1. Pipelined Shuffle:
> > > > For pipelined shuffle, the upstream and downstream tasks are
> > required to be deployed at the same time, to avoid upstream tasks being
> > blocked forever. This is fine when there are enough resources for both
> > upstream and downstream tasks to run simultaneously, but will cause the
> > following problems otherwise:
> > > >1.
> > > >   Pipelined shuffle connected tasks (i.e., a pipelined region)
> > cannot be executed until obtaining resources for all of them, resulting
> in
> > longer job finishing time and poorer resource efficiency due to holding
> > part of the resources idle while waiting for the rest.
> > > >   2.
> > > >   More severely, if multiple jobs each hold part of the cluster
> > resources and are waiting for more, a deadlock 

[jira] [Created] (FLINK-27704) Java 17 compatibility

2022-05-19 Thread Jordan Kaye (Jira)
Jordan Kaye created FLINK-27704:
---

 Summary: Java 17 compatibility
 Key: FLINK-27704
 URL: https://issues.apache.org/jira/browse/FLINK-27704
 Project: Flink
  Issue Type: Improvement
Reporter: Jordan Kaye


We're looking for an update on this issue: 
https://issues.apache.org/jira/browse/FLINK-15736

 

Java 11 is a very old LTS version and is missing vital features from Java 14 
and Java 17. A production deployment should support Java 17.

In general, I would expect compatibility for new language versions within 6 
months of their release. Being pinned to old versions has significant cascading 
impact on technical infrastructure.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-91: Support SQL Client Gateway

2022-05-19 Thread Jark Wu
Thank Shengkai for the new round of updating.
I don't have comments on the updates.
+1 for starting the vote.

Best,
Jark

On Thu, 19 May 2022 at 20:18, Shengkai Fang  wrote:

> Hi, Timo.
>
> Thanks for your feedback!
>
>  > SQLGatewayService.getFunction / UserDefinedFunctionInfo
>
> Yes. I miss some parts in the FLIP. I have fix the errors now.
>
>  > configure_session
>
> Thanks for your inputs. Considering the difference, I am still prone to use
> the `configure_session`.
>
>  > `./sql-gateway.sh`
>
> Yes, you are right. We should add a new startup script in the distribution.
> I update the FLIP and add the description to uses the script. Users can
> - start the server with `./sql-gateway.sh start -Dkey=value`
> - stop the last started server  with `./sql-gateway.sh stop`
> - stop all the running server with `./sql-gateway.sh stop-all`
>
> >  an "-e" converts a client to a server
> The "-e" options is just used to connect to the Gateway with the specified
> endpoint. It doesn't  convert the client to the server.
>
> Thank everyone for all the inputs and discussion. If no other problems, I
> think we can restart the voting tomorrow.
>
> Best,
> Shengkai
>
>
> Timo Walther  于2022年5月19日周四 15:27写道:
>
> > Hi Shengkai,
> >
> > thanks again for the update. I don't have much to add:
> >
> >  > I still think we should use a new state machine
> >
> > Thanks for the explanation about async/sync behavior. I thought we also
> > want to use the Gateway for job status updates. But if FINISHED only
> > refers to the job submission, the new state machine makes more sense.
> > Nevertheless, checking the job status will be a frequent request esp.
> > also for the new lifecycle statements. But I agree to the current design
> > that this is different to state of the operation.
> >
> >  > SQLGatewayService.getFunction / UserDefinedFunctionInfo
> >
> > You forgot to update the class. There is still a UserDefinedFunctionInfo
> > in the FLIP.
> >
> >  > configure_session
> >
> > I don't have a strong opinion on the naming of this particular REST
> > path. But my concern is that we should introduce a term of this family
> > of init/configure statements. Because in SQL Client we call it `init
> > statements` and in the gateway we call it `configuration statements`,
> > but in the end is all statements that are `not DML and DQL`.
> >
> >  > './sql-client.h -e'
> >
> > Would it make sense to introduce a separate `./sql-gateway.sh`? I find
> > it a bit confusing that an "-e" converts a client to a server. Under the
> > hood we can still share the same classes but make it a bit more explicit
> > in the distribution (also for marketing purposes of this feature).
> >
> > Please continue with the voting afterwards.
> >
> > Regards,
> > Timo
> >
> > On 17.05.22 09:14, Shengkai Fang wrote:
> > > Hi, all.
> > >
> > > After discussing with Becket Qin offline, I modified the FLIP a little.
> > The
> > > change is as follow:
> > >
> > > 1. Add /api_versions in the REST API.
> > >
> > > The api is used by the client to know the current REST Endpoint
> supports
> > > which version. The client is able to choose the version for later
> > > communication.
> > >
> > > 2. SqlClient uses -e option to input the endpoint address and port.
> > >
> > > Because the -h option has already been used by the SqlClient. We use
> the
> > > -e, --endpoint for SqlClient to input the address:port of the endpoint.
> > >
> > > 3. Use './sql-client.h -e' to start the gateway mode rather than
> > > '/sql-client.h gateway -e'.
> > >
> > > If the user specifies the -e option, it definitely means to use the
> > gateway
> > > mode. Therefore, it is redundant to use another keyword to indicate the
> > > mode.
> > >
> > > Best,
> > > Shengkai
> > >
> > > Shengkai Fang  于2022年5月17日周二 14:13写道:
> > >
> > >> Hi, Jark, Timo. Nice to have an agreement!
> > >>
> > >> Thanks for Jark's inputs about the multiple version Flink. I have
> > already
> > >> updated the FLIP in the rejected alternatives about details.
> > >>
> > >> 1. We should definitely just use LogicalTypeJsonSerializer and not a
> > >> second JSON representation.
> > >>
> > >> Our concern is mainly that it's hard for users to use because of the
> > >> flexible structure. The LogicalTypeJsonSerializer will serialize the
> > >> VARCHAR to "VARCHAR()" or "{\"TYPE\": \"VARCHAR\", \"LENGTH\":
> > 0}",
> > >> which requires the end users to process the different situations. But
> in
> > >> some cases, users just print the json to the terminal/web UI.  WDYT?
> > >>
> > >>> Serialize the RowData
> > >>
> > >> Sure. I will keep your advice in mind. I think the current
> serialization
> > >> of the RowData will not use the column name as the Object key in the
> > json.
> > >> I am not sure whether I missed something. It would be nice if you can
> > give
> > >> me an example if I do something wrong.
> > >>
> > >>> Have you also thought about using Flink's state types from Flink
> > >> tasks/jobs?
> > >>
> > >> Yes. But I still 

Re: [DISCUSS] FLIP-233: Introduce HTTP Connector

2022-05-19 Thread Ber, Jeremy
Hi Austin,

Thanks for the recommendations! After internal discussion I have decided to 
park this FLIP for now until I have more capacity to commit to it.

Jeremy

On 5/17/22, 10:26 AM, "Austin Cawley-Edwards"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Hey Jeremy,

Thanks for kicking off this discussion. As a Flink user, I too struggled
with the lack of HTTP support and rolled my own with AsyncIO. Reading
through the FLIP, I just have a few general questions and comments.

* It is not clear to me if multiple HTTP methods are supported or not? It's
listed in "Limitations" that only POSTs are allowed, but the constructor
accepts a "method" argument.
* More of a nit, the FLIP contains a lot of code, making it feel a bit more
like a PR already. It would be easier to understand the proposed interfaces
alone, and keep the implementation POC as a separate link IMO.

Since there are so many different types of HTTP APIs, and many different
ways of using them, I think the proposal would benefit from taking a more
general approach to both request building and response handling. For
instance, some APIs may return hints for retry that are not contained in
the status code alone (e.g., a "retry-after" header or such + a 429 status
code). Can we already think about how to more generally expose these two
concepts? For the retries, it might be too idealistic, but standardizing on
a retry interface like the one proposed in FLIP-232[1] would make these
aysnc/http APIs feel much more aligned.

wdyt?

Austin

[1]:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963

On Tue, May 17, 2022 at 10:21 AM Ber, Jeremy 
wrote:

> Hi there, We would like to start a discussion thread on FLIP-233:
> Introduce HTTP Connector<
> 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-233%3A+Introduce+HTTP+Connector>
> where we propose to create a connector for delivering arbitrary data
> packets from Apache Flink to an HTTP Endpoint.  This connector will give
> users flexibility to deliver data to any destination which supports REST
> endpoints. This includes REST APIs, Amazon Lambda, users internal or
> external consumers, among other options.
>
> Looking forward to your feedback.
>
> Thank you,
> Jeremy Ber
>
>



Re:[VOTE] Creating an Apache Flink slack workspace

2022-05-19 Thread Roc Marshal
+1. (non-binding.)


Best Regards

Roc.


At 2022-05-17 16:23:59, "Xintong Song"  wrote:
>Hi everyone,
>
>As previously discussed in [1], I would like to open a vote on creating an
>Apache Flink slack workspace channel.
>
>The proposed actions include:
>- Creating a dedicated slack workspace with the name Apache Flink that is
>controlled and maintained by the Apache Flink PMC
>- Updating the Flink website about rules for using various communication
>channels
>- Setting up an Archive for the Apache Flink slack
>- Revisiting this initiative by the end of 2022
>
>The vote will last for at least 72 hours, and will be accepted by a
>consensus of active PMC members.
>
>Best,
>
>Xintong


[jira] [Created] (FLINK-27703) FileChannelManagerImplTest.testDirectoriesCleanupOnKillWithoutCallerHook failed with The marker file was not found within 10000 msecs

2022-05-19 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-27703:


 Summary: 
FileChannelManagerImplTest.testDirectoriesCleanupOnKillWithoutCallerHook failed 
with The marker file was not found within 1 msecs
 Key: FLINK-27703
 URL: https://issues.apache.org/jira/browse/FLINK-27703
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-05-19T09:08:49.8088232Z May 19 09:08:49 [ERROR] Failures: 
2022-05-19T09:08:49.8090850Z May 19 09:08:49 [ERROR]   
FileChannelManagerImplTest.testDirectoriesCleanupOnKillWithoutCallerHook:97->testDirectoriesCleanupOnKill:127
 The marker file was not found within 1 msecs
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35834=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9744



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27702) Flink table code splitter does not throw anything if result is not compiled

2022-05-19 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-27702:
---

 Summary: Flink table code splitter does not throw anything if 
result is not compiled
 Key: FLINK-27702
 URL: https://issues.apache.org/jira/browse/FLINK-27702
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Runtime
Reporter: Sergey Nuyanzin


In fact there are 2 issues:
1. The code throws nothing if  splitted code is not compiled
2. If code length length than  limit then it does not check compiliation at all.

Also there are comments about that at 
https://github.com/apache/flink/pull/19638#discussion_r865605558




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27701) HashMapStateBackendWindowITCase. testAggregateWindowStateReader failed with Not all required tasks are currently running

2022-05-19 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-27701:


 Summary: HashMapStateBackendWindowITCase. 
testAggregateWindowStateReader failed with  Not all required tasks are 
currently running
 Key: FLINK-27701
 URL: https://issues.apache.org/jira/browse/FLINK-27701
 Project: Flink
  Issue Type: Bug
  Components: API / State Processor
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-05-19T11:04:27.4331524Z May 19 11:04:27 [ERROR] Tests run: 9, Failures: 0, 
Errors: 1, Skipped: 0, Time elapsed: 29.034 s <<< FAILURE! - in 
org.apache.flink.state.api.HashMapStateBackendWindowITCase
2022-05-19T11:04:27.4333055Z May 19 11:04:27 [ERROR] 
org.apache.flink.state.api.HashMapStateBackendWindowITCase.testAggregateWindowStateReader
  Time elapsed: 0.105 s  <<< ERROR!
2022-05-19T11:04:27.4333765Z May 19 11:04:27 java.lang.RuntimeException: Failed 
to take savepoint
2022-05-19T11:04:27.4334405Z May 19 11:04:27at 
org.apache.flink.state.api.utils.SavepointTestBase.takeSavepoint(SavepointTestBase.java:68)
2022-05-19T11:04:27.4335375Z May 19 11:04:27at 
org.apache.flink.state.api.SavepointWindowReaderITCase.testAggregateWindowStateReader(SavepointWindowReaderITCase.java:149)
2022-05-19T11:04:27.4338106Z May 19 11:04:27at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-05-19T11:04:27.4339140Z May 19 11:04:27at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-05-19T11:04:27.4339854Z May 19 11:04:27at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-05-19T11:04:27.4340560Z May 19 11:04:27at 
java.lang.reflect.Method.invoke(Method.java:498)
2022-05-19T11:04:27.4341746Z May 19 11:04:27at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
2022-05-19T11:04:27.4342797Z May 19 11:04:27at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2022-05-19T11:04:27.4343717Z May 19 11:04:27at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
2022-05-19T11:04:27.4344909Z May 19 11:04:27at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2022-05-19T11:04:27.4345993Z May 19 11:04:27at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
2022-05-19T11:04:27.4346981Z May 19 11:04:27at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
2022-05-19T11:04:27.4347590Z May 19 11:04:27at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
2022-05-19T11:04:27.4348200Z May 19 11:04:27at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-05-19T11:04:27.4348856Z May 19 11:04:27at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
2022-05-19T11:04:27.4349484Z May 19 11:04:27at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
2022-05-19T11:04:27.4350118Z May 19 11:04:27at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
2022-05-19T11:04:27.4350899Z May 19 11:04:27at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
2022-05-19T11:04:27.4352057Z May 19 11:04:27at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
2022-05-19T11:04:27.4353154Z May 19 11:04:27at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
2022-05-19T11:04:27.4354153Z May 19 11:04:27at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
2022-05-19T11:04:27.4354936Z May 19 11:04:27at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
2022-05-19T11:04:27.4355560Z May 19 11:04:27at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
2022-05-19T11:04:27.4356167Z May 19 11:04:27at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
2022-05-19T11:04:27.4356775Z May 19 11:04:27at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
2022-05-19T11:04:27.4357358Z May 19 11:04:27at 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2022-05-19T11:04:27.4357932Z May 19 11:04:27at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-05-19T11:04:27.4358500Z May 19 11:04:27at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
2022-05-19T11:04:27.4359055Z May 19 11:04:27at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137)
2022-05-19T11:04:27.4359584Z May 19 11:04:27at 
org.junit.runner.JUnitCore.run(JUnitCore.java:115)
2022-05-19T11:04:27.4360174Z May 19 11:04:27at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
2022-05-19T11:04:27.4361027Z May 19 11:04:27at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
2022-05-19T11:04:27.4361782Z 

Re: Flink application on yarn cluster - main method not found

2022-05-19 Thread Weihua Hu
Hi,

Which version of flink are you using?
It looks like there is a conflict between the flink version of the cluster and 
the version in userjar

Best,
Weihua

> 2022年5月19日 下午4:49,Zain Haider Nemati  写道:
> 
> Hi,
> Im running flink application on yarn cluster it is giving me this error, it 
> is working fine on standalone cluster. Any idea what could be causing this?
> 
> Exception in thread "main" java.lang.NoSuchMethodError: 
> org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder([Ljava/lang/String;)Lorg/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever$Builder;
> at 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgramRetriever(YarnApplicationClusterEntryPoint.java:137)
> at 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgram(YarnApplicationClusterEntryPoint.java:121)
> at 
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.main(YarnApplicationClusterEntryPoint.java:95)



Re: [DISCUSS] FLIP-91: Support SQL Client Gateway

2022-05-19 Thread Shengkai Fang
Hi, Timo.

Thanks for your feedback!

 > SQLGatewayService.getFunction / UserDefinedFunctionInfo

Yes. I miss some parts in the FLIP. I have fix the errors now.

 > configure_session

Thanks for your inputs. Considering the difference, I am still prone to use
the `configure_session`.

 > `./sql-gateway.sh`

Yes, you are right. We should add a new startup script in the distribution.
I update the FLIP and add the description to uses the script. Users can
- start the server with `./sql-gateway.sh start -Dkey=value`
- stop the last started server  with `./sql-gateway.sh stop`
- stop all the running server with `./sql-gateway.sh stop-all`

>  an "-e" converts a client to a server
The "-e" options is just used to connect to the Gateway with the specified
endpoint. It doesn't  convert the client to the server.

Thank everyone for all the inputs and discussion. If no other problems, I
think we can restart the voting tomorrow.

Best,
Shengkai


Timo Walther  于2022年5月19日周四 15:27写道:

> Hi Shengkai,
>
> thanks again for the update. I don't have much to add:
>
>  > I still think we should use a new state machine
>
> Thanks for the explanation about async/sync behavior. I thought we also
> want to use the Gateway for job status updates. But if FINISHED only
> refers to the job submission, the new state machine makes more sense.
> Nevertheless, checking the job status will be a frequent request esp.
> also for the new lifecycle statements. But I agree to the current design
> that this is different to state of the operation.
>
>  > SQLGatewayService.getFunction / UserDefinedFunctionInfo
>
> You forgot to update the class. There is still a UserDefinedFunctionInfo
> in the FLIP.
>
>  > configure_session
>
> I don't have a strong opinion on the naming of this particular REST
> path. But my concern is that we should introduce a term of this family
> of init/configure statements. Because in SQL Client we call it `init
> statements` and in the gateway we call it `configuration statements`,
> but in the end is all statements that are `not DML and DQL`.
>
>  > './sql-client.h -e'
>
> Would it make sense to introduce a separate `./sql-gateway.sh`? I find
> it a bit confusing that an "-e" converts a client to a server. Under the
> hood we can still share the same classes but make it a bit more explicit
> in the distribution (also for marketing purposes of this feature).
>
> Please continue with the voting afterwards.
>
> Regards,
> Timo
>
> On 17.05.22 09:14, Shengkai Fang wrote:
> > Hi, all.
> >
> > After discussing with Becket Qin offline, I modified the FLIP a little.
> The
> > change is as follow:
> >
> > 1. Add /api_versions in the REST API.
> >
> > The api is used by the client to know the current REST Endpoint supports
> > which version. The client is able to choose the version for later
> > communication.
> >
> > 2. SqlClient uses -e option to input the endpoint address and port.
> >
> > Because the -h option has already been used by the SqlClient. We use the
> > -e, --endpoint for SqlClient to input the address:port of the endpoint.
> >
> > 3. Use './sql-client.h -e' to start the gateway mode rather than
> > '/sql-client.h gateway -e'.
> >
> > If the user specifies the -e option, it definitely means to use the
> gateway
> > mode. Therefore, it is redundant to use another keyword to indicate the
> > mode.
> >
> > Best,
> > Shengkai
> >
> > Shengkai Fang  于2022年5月17日周二 14:13写道:
> >
> >> Hi, Jark, Timo. Nice to have an agreement!
> >>
> >> Thanks for Jark's inputs about the multiple version Flink. I have
> already
> >> updated the FLIP in the rejected alternatives about details.
> >>
> >> 1. We should definitely just use LogicalTypeJsonSerializer and not a
> >> second JSON representation.
> >>
> >> Our concern is mainly that it's hard for users to use because of the
> >> flexible structure. The LogicalTypeJsonSerializer will serialize the
> >> VARCHAR to "VARCHAR()" or "{\"TYPE\": \"VARCHAR\", \"LENGTH\":
> 0}",
> >> which requires the end users to process the different situations. But in
> >> some cases, users just print the json to the terminal/web UI.  WDYT?
> >>
> >>> Serialize the RowData
> >>
> >> Sure. I will keep your advice in mind. I think the current serialization
> >> of the RowData will not use the column name as the Object key in the
> json.
> >> I am not sure whether I missed something. It would be nice if you can
> give
> >> me an example if I do something wrong.
> >>
> >>> Have you also thought about using Flink's state types from Flink
> >> tasks/jobs?
> >>
> >> Yes. But I still think we should use a new state machine. First of all,
> >> Operation in the FLIP is much different from the Job. Operations include
> >> DDL, DML and so on. So it's not suitable to use the small concept to
> >> replace the big concept. Actually some status in the JobStatus, e.g.
> >> RESTARTING/SUSPENDED/RECONCILING don't work in the DDL Operation.
> >>
> >> On the other hand, the Gateway allows users to submit jobs(DML) in
> >> 

Re: [VOTE] Creating an Apache Flink slack workspace

2022-05-19 Thread Konstantin Knauf
+1 for the user community, not the contributor/dev community

Am Do., 19. Mai 2022 um 12:44 Uhr schrieb Yuan Mei :

> +1 (binding)
>
> This facilitates people collaborating on the same project from different
> organizations. I really like this idea.
>
> On Thu, May 19, 2022 at 12:43 PM Peter Huang 
> wrote:
>
> > +1 (non-binding)
> >
> >
> > Best Regards
> > Peter Huang
> >
> > On Wed, May 18, 2022 at 9:33 PM Leonard Xu  wrote:
> >
> > > Thanks Xintong for driving this.
> > >
> > >  +1
> > >
> > > Best,
> > > Leonard
> > >
> > > > 2022年5月19日 上午11:11,Zhou, Brian  写道:
> > > >
> > > > +1 (non-binding)  Slack is a better place for code sharing and quick
> > > discussion.
> > > >
> > > > Regards,
> > > > Brian Zhou
> > > >
> > > > -Original Message-
> > > > From: Yun Tang 
> > > > Sent: Thursday, May 19, 2022 10:32 AM
> > > > To: dev
> > > > Subject: Re: [VOTE] Creating an Apache Flink slack workspace
> > > >
> > > >
> > > > [EXTERNAL EMAIL]
> > > >
> > > > Thanks Xintong for driving this. +1 from my side.
> > > >
> > > >
> > > > Best
> > > > Yun Tang
> > > > 
> > > > From: Zhu Zhu 
> > > > Sent: Wednesday, May 18, 2022 17:08
> > > > To: dev 
> > > > Subject: Re: [VOTE] Creating an Apache Flink slack workspace
> > > >
> > > > +1 (binding)
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Timo Walther  于2022年5月18日周三 16:52写道:
> > > >>
> > > >> +1 (binding)
> > > >>
> > > >> Thanks,
> > > >> Timo
> > > >>
> > > >>
> > > >> On 17.05.22 20:44, Gyula Fóra wrote:
> > > >>> +1 (binding)
> > > >>>
> > > >>> On Tue, 17 May 2022 at 19:52, Yufei Zhang 
> > wrote:
> > > >>>
> > >  +1 (nonbinding)
> > > 
> > >  On Tue, May 17, 2022 at 5:29 PM Márton Balassi <
> > > balassi.mar...@gmail.com>
> > >  wrote:
> > > 
> > > > +1 (binding)
> > > >
> > > > On Tue, May 17, 2022 at 11:00 AM Jingsong Li <
> > jingsongl...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > >> Thank Xintong for driving this work.
> > > >>
> > > >> +1
> > > >>
> > > >> Best,
> > > >> Jingsong
> > > >>
> > > >> On Tue, May 17, 2022 at 4:49 PM Martijn Visser <
> > >  martijnvis...@apache.org
> > > >>
> > > >> wrote:
> > > >>
> > > >>> +1 (binding)
> > > >>>
> > > >>> On Tue, 17 May 2022 at 10:38, Yu Li  wrote:
> > > >>>
> > >  +1 (binding)
> > > 
> > >  Thanks Xintong for driving this!
> > > 
> > >  Best Regards,
> > >  Yu
> > > 
> > > 
> > >  On Tue, 17 May 2022 at 16:32, Robert Metzger <
> > metrob...@gmail.com
> > > >
> > > >>> wrote:
> > > 
> > > > Thanks for starting the VOTE!
> > > >
> > > > +1 (binding)
> > > >
> > > >
> > > >
> > > > On Tue, May 17, 2022 at 10:29 AM Jark Wu 
> > >  wrote:
> > > >
> > > >> Thank Xintong for driving this work.
> > > >>
> > > >> +1 from my side (binding)
> > > >>
> > > >> Best,
> > > >> Jark
> > > >>
> > > >> On Tue, 17 May 2022 at 16:24, Xintong Song <
> > > > tonysong...@gmail.com>
> > > > wrote:
> > > >>
> > > >>> Hi everyone,
> > > >>>
> > > >>> As previously discussed in [1], I would like to open a vote
> > >  on
> > >  creating
> > > >> an
> > > >>> Apache Flink slack workspace channel.
> > > >>>
> > > >>> The proposed actions include:
> > > >>> - Creating a dedicated slack workspace with the name Apache
> > > > Flink
> > >  that
> > > > is
> > > >>> controlled and maintained by the Apache Flink PMC
> > > >>> - Updating the Flink website about rules for using various
> > > > communication
> > > >>> channels
> > > >>> - Setting up an Archive for the Apache Flink slack
> > > >>> - Revisiting this initiative by the end of 2022
> > > >>>
> > > >>> The vote will last for at least 72 hours, and will be
> > >  accepted
> > > >> by a
> > > >>> consensus of active PMC members.
> > > >>>
> > > >>> Best,
> > > >>>
> > > >>> Xintong
> > > >>>
> > > >>
> > > >
> > > 
> > > >>>
> > > >>
> > > >
> > > 
> > > >>>
> > > >>
> > >
> > >
> >
>


-- 
https://twitter.com/snntrable
https://github.com/knaufk


[jira] [Created] (FLINK-27699) Align Pulsar source atPublishTime method

2022-05-19 Thread LuNng Wang (Jira)
LuNng Wang created FLINK-27699:
--

 Summary: Align Pulsar source atPublishTime method
 Key: FLINK-27699
 URL: https://issues.apache.org/jira/browse/FLINK-27699
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, API / Python
Affects Versions: 1.15.0
Reporter: LuNng Wang


StopCursor#atEventTime is deprecated, align to 
StopCursor#{color:#172b4d}atPublishTime.{color}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: Flink application on yarn cluster - main method not found

2022-05-19 Thread Zain Haider Nemati
Hi Folks,
Would appreciate it if someone could help me out with this !

Cheers

On Thu, May 19, 2022 at 1:49 PM Zain Haider Nemati 
wrote:

> Hi,
> Im running flink application on yarn cluster it is giving me this error,
> it is working fine on standalone cluster. Any idea what could be causing
> this?
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder([Ljava/lang/String;)Lorg/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever$Builder;
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgramRetriever(YarnApplicationClusterEntryPoint.java:137)
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgram(YarnApplicationClusterEntryPoint.java:121)
> at
> org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.main(YarnApplicationClusterEntryPoint.java:95)
>


Re: [VOTE] Creating an Apache Flink slack workspace

2022-05-19 Thread Yuan Mei
+1 (binding)

This facilitates people collaborating on the same project from different
organizations. I really like this idea.

On Thu, May 19, 2022 at 12:43 PM Peter Huang 
wrote:

> +1 (non-binding)
>
>
> Best Regards
> Peter Huang
>
> On Wed, May 18, 2022 at 9:33 PM Leonard Xu  wrote:
>
> > Thanks Xintong for driving this.
> >
> >  +1
> >
> > Best,
> > Leonard
> >
> > > 2022年5月19日 上午11:11,Zhou, Brian  写道:
> > >
> > > +1 (non-binding)  Slack is a better place for code sharing and quick
> > discussion.
> > >
> > > Regards,
> > > Brian Zhou
> > >
> > > -Original Message-
> > > From: Yun Tang 
> > > Sent: Thursday, May 19, 2022 10:32 AM
> > > To: dev
> > > Subject: Re: [VOTE] Creating an Apache Flink slack workspace
> > >
> > >
> > > [EXTERNAL EMAIL]
> > >
> > > Thanks Xintong for driving this. +1 from my side.
> > >
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Zhu Zhu 
> > > Sent: Wednesday, May 18, 2022 17:08
> > > To: dev 
> > > Subject: Re: [VOTE] Creating an Apache Flink slack workspace
> > >
> > > +1 (binding)
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Timo Walther  于2022年5月18日周三 16:52写道:
> > >>
> > >> +1 (binding)
> > >>
> > >> Thanks,
> > >> Timo
> > >>
> > >>
> > >> On 17.05.22 20:44, Gyula Fóra wrote:
> > >>> +1 (binding)
> > >>>
> > >>> On Tue, 17 May 2022 at 19:52, Yufei Zhang 
> wrote:
> > >>>
> >  +1 (nonbinding)
> > 
> >  On Tue, May 17, 2022 at 5:29 PM Márton Balassi <
> > balassi.mar...@gmail.com>
> >  wrote:
> > 
> > > +1 (binding)
> > >
> > > On Tue, May 17, 2022 at 11:00 AM Jingsong Li <
> jingsongl...@gmail.com
> > >
> > > wrote:
> > >
> > >> Thank Xintong for driving this work.
> > >>
> > >> +1
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Tue, May 17, 2022 at 4:49 PM Martijn Visser <
> >  martijnvis...@apache.org
> > >>
> > >> wrote:
> > >>
> > >>> +1 (binding)
> > >>>
> > >>> On Tue, 17 May 2022 at 10:38, Yu Li  wrote:
> > >>>
> >  +1 (binding)
> > 
> >  Thanks Xintong for driving this!
> > 
> >  Best Regards,
> >  Yu
> > 
> > 
> >  On Tue, 17 May 2022 at 16:32, Robert Metzger <
> metrob...@gmail.com
> > >
> > >>> wrote:
> > 
> > > Thanks for starting the VOTE!
> > >
> > > +1 (binding)
> > >
> > >
> > >
> > > On Tue, May 17, 2022 at 10:29 AM Jark Wu 
> >  wrote:
> > >
> > >> Thank Xintong for driving this work.
> > >>
> > >> +1 from my side (binding)
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> On Tue, 17 May 2022 at 16:24, Xintong Song <
> > > tonysong...@gmail.com>
> > > wrote:
> > >>
> > >>> Hi everyone,
> > >>>
> > >>> As previously discussed in [1], I would like to open a vote
> >  on
> >  creating
> > >> an
> > >>> Apache Flink slack workspace channel.
> > >>>
> > >>> The proposed actions include:
> > >>> - Creating a dedicated slack workspace with the name Apache
> > > Flink
> >  that
> > > is
> > >>> controlled and maintained by the Apache Flink PMC
> > >>> - Updating the Flink website about rules for using various
> > > communication
> > >>> channels
> > >>> - Setting up an Archive for the Apache Flink slack
> > >>> - Revisiting this initiative by the end of 2022
> > >>>
> > >>> The vote will last for at least 72 hours, and will be
> >  accepted
> > >> by a
> > >>> consensus of active PMC members.
> > >>>
> > >>> Best,
> > >>>
> > >>> Xintong
> > >>>
> > >>
> > >
> > 
> > >>>
> > >>
> > >
> > 
> > >>>
> > >>
> >
> >
>


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread weijie guo
Yangze, Thank you for the feedback!
Here's my thoughts for your questions:

>>> How do we decide the size of the buffer pool in MemoryDataManager and
the read buffers in FileDataManager?
The BufferPool in MemoryDataManager is the LocalBufferPool used by
ResultPartition, and the size is the same as the current implementation of
sort-merge shuffle. In other words, the minimum value of BufferPool is a
configurable fixed value, and the maximum value is Math.max(min, 4 *
numSubpartitions). The default value can be determined by running the
TPC-DS tests.
Read buffers in FileDataManager are requested from the
BatchShuffleReadBufferPool shared by TaskManager, it's size controlled by
*taskmanager.memory.framework.off-heap.batch-shuffle.size*, the default
value is 32M, which is consistent with the current sort-merge shuffle logic.

>>> Is there an upper limit for the sum of them? If there is, how does
MemoryDataManager and FileDataManager sync the memory usage?
The buffers of the MemoryDataManager are limited by the size of the
LocalBufferPool, and the upper limit is the size of the Network Memory. The
buffers of the FileDataManager are directly requested from
UnpooledOffHeapMemory, and are also limited by the size of the framework
off-heap memory. I think there should be no need for additional
synchronization mechanisms.

>>> How do you disable the slot sharing? If user configures both the slot
sharing group and hybrid shuffle, what will happen to that job?
I think we can print a warning log when Hybrid Shuffle is enabled and SSG
is configured during the JobGraph compilation stage, and fallback to the
region slot sharing group by default. Of course, it will be emphasized in
the document that we do not currently support SSG, If configured, it will
fall back to the default.


Best regards,

Weijie


Yangze Guo  于2022年5月19日周四 16:25写道:

> Thanks for driving this. Xintong and Weijie.
>
> I believe this feature will make Flink a better batch/OLAP engine. +1
> for the overall design.
>
> Some questions:
> 1. How do we decide the size of the buffer pool in MemoryDataManager
> and the read buffers in FileDataManager?
> 2. Is there an upper limit for the sum of them? If there is, how does
> MemoryDataManager and FileDataManager sync the memory usage?
> 3. How do you disable the slot sharing? If user configures both the
> slot sharing group and hybrid shuffle, what will happen to that job?
>
>
> Best,
> Yangze Guo
>
> On Thu, May 19, 2022 at 2:41 PM Xintong Song 
> wrote:
> >
> > Thanks for preparing this FLIP, Weijie.
> >
> > I think this is a good improvement on batch resource elasticity. Looking
> > forward to the community feedback.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Thu, May 19, 2022 at 2:31 PM weijie guo 
> > wrote:
> >
> > > Hi all,
> > >
> > >
> > > I’d like to start a discussion about FLIP-235[1], which introduce a
> new shuffle mode
> > >  can overcome some of the problems of Pipelined Shuffle and Blocking
> Shuffle in batch scenarios.
> > >
> > >
> > > Currently in Flink, task scheduling is more or less constrained by the
> shuffle implementations.
> > > This will bring the following disadvantages:
> > >
> > >1. Pipelined Shuffle:
> > > For pipelined shuffle, the upstream and downstream tasks are
> required to be deployed at the same time, to avoid upstream tasks being
> blocked forever. This is fine when there are enough resources for both
> upstream and downstream tasks to run simultaneously, but will cause the
> following problems otherwise:
> > >1.
> > >   Pipelined shuffle connected tasks (i.e., a pipelined region)
> cannot be executed until obtaining resources for all of them, resulting in
> longer job finishing time and poorer resource efficiency due to holding
> part of the resources idle while waiting for the rest.
> > >   2.
> > >   More severely, if multiple jobs each hold part of the cluster
> resources and are waiting for more, a deadlock would occur. The chance is
> not trivial, especially for scenarios such as OLAP where concurrent job
> submissions are frequent.
> > >   2. Blocking Shuffle:
> > > For blocking shuffle, execution of downstream tasks must wait for
> all upstream tasks to finish, despite there might be more resources
> available. The sequential execution of upstream and downstream tasks
> significantly increase the job finishing time, and the disk IO workload for
> spilling and loading full intermediate data also affects the performance.
> > >
> > >
> > > We believe the root cause of the above problems is that shuffle
> implementations put unnecessary constraints on task scheduling.
> > >
> > >
> > > To solve this problem, Xintong Song and I propose to introduce hybrid
> shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink
> should:
> > >
> > >1. Make best use of available resources.
> > > Ideally, we want Flink to always make progress if possible. That
> is to say, it should always execute a pending task if 

[DISCUSS] FLIP-234: Support Retryable Lookup Join To Solve Delayed Updates Issue In External Systems

2022-05-19 Thread Lincoln Lee
Dear Flink developers,

I would like to open a discussion on FLIP 234 [1] to support retryable
lookup join to solve delayed updates issue, as a pre-work for this
solution, we proposed FLIP-232[2] which adds a generic retry support for
Async I/O.
We prefer to offer this retry capability via query hints, similar to new
join hints proposed in FLINK-27625[3] & FLIP-204[4].

This feature is backwards compatible and transparently to connectors. For
existing connectors which implements AsyncTableFunction, can easily enable
async retry via the new join hint.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
[3] https://lists.apache.org/thread/jm9kg33wk9z2bvo2b0g5bp3n5kfj6qv8
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204:+Introduce+Hash+Lookup+Join

Best,
Lincoln Lee


Final reminder: ApacheCon North America call for presentations closing soon

2022-05-19 Thread Rich Bowen
[Note: You're receiving this because you are subscribed to one or more
Apache Software Foundation project mailing lists.]

This is your final reminder that the Call for Presetations for
ApacheCon North America 2022 will close at 00:01 GMT on Monday, May
23rd, 2022. Please don't wait! Get your talk proposals in now!

Details here: https://apachecon.com/acna2022/cfp.html

--Rich, for the ApacheCon Planners




Re: [DISCUSS] FLIP-224: Blacklist Mechanism

2022-05-19 Thread Lijie Wang
Hi Konstantin,

We found that Flink REST URL does not support the format ":merge" , which
will be recognized as a parameter in the URL(due to start with a colon).

We will keep the previous way, i.e.

POST: http://{jm_rest_address:port}/blocklist/taskmanagers
and the "id" and "merge" flag are put into the request body.

Best,
Lijie

Lijie Wang  于2022年5月18日周三 09:35写道:

> Hi Weihua,
> thanks for feedback.
>
> 1. Yes, only *Manually* is supported in this FLIP, but it's the first step
> towards auto-detection.
> 2. We wii print the blocked nodes in logs. Maybe also put it into the
> exception of insufficient resources.
> 3. No. This FLIP won't change the WebUI. The blocklist information can be
> obtained through REST API and metrics.
>
> Best,
> Lijie
>
> Weihua Hu  于2022年5月17日周二 21:41写道:
>
>> Hi,
>> Thanks for creating this FLIP.
>> We have implemented an automatic blocklist detection mechanism
>> internally, which is indeed very effective for handling node failures.
>> Due to the large number of nodes, although SREs already support automatic
>> offline failure nodes, the detection is not 100% accurate and there is some
>> delay.
>> So the blocklist mechanism can make flink job recover from failure much
>> faster.
>>
>> Here are some of my thoughts:
>> 1. In this FLIP, it needs users to locate machine failure manually, there
>> is a certain cost of use
>> 2. What happens if too many nodes are blocked, resulting in insufficient
>> resources? Will there be a special Exception for the user?
>> 3. Will we display the blocklist information in the WebUI? The blocklist
>> is for cluster level, and if multiple users share a cluster, some users may
>> be a little confused when resources are not enough, or when resources are
>> applied for more.
>>
>> Also, Looking forward to the next FLIP on auto-detection.
>>
>> Best,
>> Weihua
>>
>> > 2022年5月16日 下午11:22,Lijie Wang  写道:
>> >
>> > Hi Konstantin,
>> >
>> > Maybe change it to the following:
>> >
>> > 1. POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}
>> > Merge is not allowed. If the {id} already exists, return error.
>> Otherwise,
>> > create a new item.
>> >
>> > 2. POST: http://
>> {jm_rest_address:port}/blocklist/taskmanagers/{id}:merge
>> > Merge is allowed. If the {id} already exists, merge. Otherwise, create a
>> > new item.
>> >
>> > WDYT?
>> >
>> > Best,
>> > Lijie
>> >
>> > Konstantin Knauf  于2022年5月16日周一 20:07写道:
>> >
>> >> Hi Lijie,
>> >>
>> >> hm, maybe the following is more appropriate in that case
>> >>
>> >> POST: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}:merge
>> >>
>> >> Best,
>> >>
>> >> Konstantin
>> >>
>> >> Am Mo., 16. Mai 2022 um 07:05 Uhr schrieb Lijie Wang <
>> >> wangdachui9...@gmail.com>:
>> >>
>> >>> Hi Konstantin,
>> >>> thanks for your feedback.
>> >>>
>> >>> From what I understand, PUT should be idempotent. However, we have a
>> >>> *timeout* field in the request. This means that initiating the same
>> >> request
>> >>> at two different times will lead to different resource status
>> (timestamps
>> >>> of the items to be removed will be different).
>> >>>
>> >>> Should we use PUT in this case? WDYT?
>> >>>
>> >>> Best,
>> >>> Lijie
>> >>>
>> >>> Konstantin Knauf  于2022年5月13日周五 17:20写道:
>> >>>
>>  Hi Lijie,
>> 
>>  wouldn't the REST API-idiomatic way for an update/replace be a PUT on
>> >> the
>>  resource?
>> 
>>  PUT: http://{jm_rest_address:port}/blocklist/taskmanagers/{id}
>> 
>>  Best,
>> 
>>  Konstantin
>> 
>> 
>> 
>>  Am Fr., 13. Mai 2022 um 11:01 Uhr schrieb Lijie Wang <
>>  wangdachui9...@gmail.com>:
>> 
>> > Hi everyone,
>> >
>> > I've had an offline discussion with Becket Qin and Zhu Zhu, and made
>> >>> the
>> > following changes on REST API:
>> > 1. To avoid ambiguity, *timeout* and *endTimestamp* can only choose
>> >>> one.
>>  If
>> > both are specified, will return error.
>> > 2.  If the specified item is already there, the *ADD* operation has
>> >> two
>> > behaviors:  *return error*(default value) or *merge/update*, and we
>> >>> add a
>> > flag to the request body to control it. You can find more details
>> >>> "Public
>> > Interface" section.
>> >
>> > If there is no more feedback, we will start the vote thread next
>> >> week.
>> >
>> > Best,
>> > Lijie
>> >
>> > Lijie Wang  于2022年5月10日周二 17:14写道:
>> >
>> >> Hi Becket Qin,
>> >>
>> >> Thanks for your suggestions.  I have moved the description of
>> >> configurations, metrics and REST API into "Public Interface"
>> >> section,
>>  and
>> >> made a few updates according to your suggestion.  And in this FLIP,
>>  there
>> >> no public java Interfaces or pluggables that users need to
>> >> implement
>> >>> by
>> >> themselves.
>> >>
>> >> Answers for you questions:
>> >> 1. Yes, there 2 block actions: MARK_BLOCKED and.
>> >> 

Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-05-19 Thread Александр Смирнов
Also I have a few additions:
1) maybe rename 'lookup.cache.maximum-size' to
'lookup.cache.max-rows'? I think it will be more clear that we talk
not about bytes, but about the number of rows. Plus it fits more,
considering my optimization with filters.
2) How will users enable rescanning? Are we going to separate caching
and rescanning from the options point of view? Like initially we had
one option 'lookup.cache' with values LRU / ALL. I think now we can
make a boolean option 'lookup.rescan'. RescanInterval can be
'lookup.rescan.interval', etc.

Best regards,
Alexander

чт, 19 мая 2022 г. в 14:50, Александр Смирнов :
>
> Hi Qingsheng and Jark,
>
> 1. Builders vs 'of'
> I understand that builders are used when we have multiple parameters.
> I suggested them because we could add parameters later. To prevent
> Builder for ScanRuntimeProvider from looking redundant I can suggest
> one more config now - "rescanStartTime".
> It's a time in UTC (LocalTime class) when the first reload of cache
> starts. This parameter can be thought of as 'initialDelay' (diff
> between current time and rescanStartTime) in method
> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be very
> useful when the dimension table is updated by some other scheduled job
> at a certain time. Or when the user simply wants a second scan (first
> cache reload) be delayed. This option can be used even without
> 'rescanInterval' - in this case 'rescanInterval' will be one day.
> If you are fine with this option, I would be very glad if you would
> give me access to edit FLIP page, so I could add it myself
>
> 2. Common table options
> I also think that FactoryUtil would be overloaded by all cache
> options. But maybe unify all suggested options, not only for default
> cache? I.e. class 'LookupOptions', that unifies default cache options,
> rescan options, 'async', 'maxRetries'. WDYT?
>
> 3. Retries
> I'm fine with suggestion close to RetryUtils#tryTimes(times, call)
>
> [1] 
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
>
> Best regards,
> Alexander
>
> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren :
> >
> > Hi Jark and Alexander,
> >
> > Thanks for your comments! I’m also OK to introduce common table options. I 
> > prefer to introduce a new DefaultLookupCacheOptions class for holding these 
> > option definitions because putting all options into FactoryUtil would make 
> > it a bit ”crowded” and not well categorized.
> >
> > FLIP has been updated according to suggestions above:
> > 1. Use static “of” method for constructing RescanRuntimeProvider 
> > considering both arguments are required.
> > 2. Introduce new table options matching DefaultLookupCacheFactory
> >
> > Best,
> > Qingsheng
> >
> > On Wed, May 18, 2022 at 2:57 PM Jark Wu  wrote:
> >>
> >> Hi Alex,
> >>
> >> 1) retry logic
> >> I think we can extract some common retry logic into utilities, e.g. 
> >> RetryUtils#tryTimes(times, call).
> >> This seems independent of this FLIP and can be reused by DataStream users.
> >> Maybe we can open an issue to discuss this and where to put it.
> >>
> >> 2) cache ConfigOptions
> >> I'm fine with defining cache config options in the framework.
> >> A candidate place to put is FactoryUtil which also includes 
> >> "sink.parallelism", "format" options.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Wed, 18 May 2022 at 13:52, Александр Смирнов  
> >> wrote:
> >>>
> >>> Hi Qingsheng,
> >>>
> >>> Thank you for considering my comments.
> >>>
> >>> >  there might be custom logic before making retry, such as re-establish 
> >>> > the connection
> >>>
> >>> Yes, I understand that. I meant that such logic can be placed in a
> >>> separate function, that can be implemented by connectors. Just moving
> >>> the retry logic would make connector's LookupFunction more concise +
> >>> avoid duplicate code. However, it's a minor change. The decision is up
> >>> to you.
> >>>
> >>> > We decide not to provide common DDL options and let developers to 
> >>> > define their own options as we do now per connector.
> >>>
> >>> What is the reason for that? One of the main goals of this FLIP was to
> >>> unify the configs, wasn't it? I understand that current cache design
> >>> doesn't depend on ConfigOptions, like was before. But still we can put
> >>> these options into the framework, so connectors can reuse them and
> >>> avoid code duplication, and, what is more significant, avoid possible
> >>> different options naming. This moment can be pointed out in
> >>> documentation for connector developers.
> >>>
> >>> Best regards,
> >>> Alexander
> >>>
> >>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren :
> >>> >
> >>> > Hi Alexander,
> >>> >
> >>> > Thanks for the review and glad to see we are on the same page! I think 
> >>> > you forgot to cc the dev mailing list so I’m also quoting your reply 
> >>> > under this email.
> >>> >
> >>> > >  We can 

[jira] [Created] (FLINK-27698) [JUnit5 Migration] Module: flink-table-api-java-bridge

2022-05-19 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-27698:
---

 Summary: [JUnit5 Migration] Module: flink-table-api-java-bridge
 Key: FLINK-27698
 URL: https://issues.apache.org/jira/browse/FLINK-27698
 Project: Flink
  Issue Type: Sub-task
Reporter: Sergey Nuyanzin






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Flink application on yarn cluster - main method not found

2022-05-19 Thread Zain Haider Nemati
Hi,
Im running flink application on yarn cluster it is giving me this error, it
is working fine on standalone cluster. Any idea what could be causing this?

Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.flink.client.deployment.application.ClassPathPackagedProgramRetriever.newBuilder([Ljava/lang/String;)Lorg/apache/flink/client/deployment/application/ClassPathPackagedProgramRetriever$Builder;
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgramRetriever(YarnApplicationClusterEntryPoint.java:137)
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.getPackagedProgram(YarnApplicationClusterEntryPoint.java:121)
at
org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint.main(YarnApplicationClusterEntryPoint.java:95)


[jira] [Created] (FLINK-27697) Restoring from a checkpoint will start duplicated application when mixing use streaming sinks and sql sinks in code.

2022-05-19 Thread Ying Lin (Jira)
Ying Lin created FLINK-27697:


 Summary: Restoring from a checkpoint will start duplicated 
application when mixing use streaming sinks and sql sinks in code.
 Key: FLINK-27697
 URL: https://issues.apache.org/jira/browse/FLINK-27697
 Project: Flink
  Issue Type: Bug
Reporter: Ying Lin


I mixing use `flink sql` and `DataStream` with two sink. When I restore from a 
checkpoint, it restart two application.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Yangze Guo
Thanks for driving this. Xintong and Weijie.

I believe this feature will make Flink a better batch/OLAP engine. +1
for the overall design.

Some questions:
1. How do we decide the size of the buffer pool in MemoryDataManager
and the read buffers in FileDataManager?
2. Is there an upper limit for the sum of them? If there is, how does
MemoryDataManager and FileDataManager sync the memory usage?
3. How do you disable the slot sharing? If user configures both the
slot sharing group and hybrid shuffle, what will happen to that job?


Best,
Yangze Guo

On Thu, May 19, 2022 at 2:41 PM Xintong Song  wrote:
>
> Thanks for preparing this FLIP, Weijie.
>
> I think this is a good improvement on batch resource elasticity. Looking
> forward to the community feedback.
>
> Best,
>
> Xintong
>
>
>
> On Thu, May 19, 2022 at 2:31 PM weijie guo 
> wrote:
>
> > Hi all,
> >
> >
> > I’d like to start a discussion about FLIP-235[1], which introduce a new 
> > shuffle mode
> >  can overcome some of the problems of Pipelined Shuffle and Blocking 
> > Shuffle in batch scenarios.
> >
> >
> > Currently in Flink, task scheduling is more or less constrained by the 
> > shuffle implementations.
> > This will bring the following disadvantages:
> >
> >1. Pipelined Shuffle:
> > For pipelined shuffle, the upstream and downstream tasks are required 
> > to be deployed at the same time, to avoid upstream tasks being blocked 
> > forever. This is fine when there are enough resources for both upstream and 
> > downstream tasks to run simultaneously, but will cause the following 
> > problems otherwise:
> >1.
> >   Pipelined shuffle connected tasks (i.e., a pipelined region) cannot 
> > be executed until obtaining resources for all of them, resulting in longer 
> > job finishing time and poorer resource efficiency due to holding part of 
> > the resources idle while waiting for the rest.
> >   2.
> >   More severely, if multiple jobs each hold part of the cluster 
> > resources and are waiting for more, a deadlock would occur. The chance is 
> > not trivial, especially for scenarios such as OLAP where concurrent job 
> > submissions are frequent.
> >   2. Blocking Shuffle:
> > For blocking shuffle, execution of downstream tasks must wait for all 
> > upstream tasks to finish, despite there might be more resources available. 
> > The sequential execution of upstream and downstream tasks significantly 
> > increase the job finishing time, and the disk IO workload for spilling and 
> > loading full intermediate data also affects the performance.
> >
> >
> > We believe the root cause of the above problems is that shuffle 
> > implementations put unnecessary constraints on task scheduling.
> >
> >
> > To solve this problem, Xintong Song and I propose to introduce hybrid 
> > shuffle to minimize the scheduling constraints. With Hybrid Shuffle, Flink 
> > should:
> >
> >1. Make best use of available resources.
> > Ideally, we want Flink to always make progress if possible. That is to 
> > say, it should always execute a pending task if there are resources 
> > available for that task.
> >2. Minimize disk IO load.
> > In-flight data should be consumed directly from memory as much as 
> > possible. Only data that is not consumed timely should be spilled to disk.
> >
> > You can find more details in FLIP-235. Looking forward to your feedback.
> >
> >
> > [1]
> >
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >
> >
> >
> > Best regards,
> >
> > Weijie
> >


Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-05-19 Thread Александр Смирнов
Hi Qingsheng and Jark,

1. Builders vs 'of'
I understand that builders are used when we have multiple parameters.
I suggested them because we could add parameters later. To prevent
Builder for ScanRuntimeProvider from looking redundant I can suggest
one more config now - "rescanStartTime".
It's a time in UTC (LocalTime class) when the first reload of cache
starts. This parameter can be thought of as 'initialDelay' (diff
between current time and rescanStartTime) in method
ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be very
useful when the dimension table is updated by some other scheduled job
at a certain time. Or when the user simply wants a second scan (first
cache reload) be delayed. This option can be used even without
'rescanInterval' - in this case 'rescanInterval' will be one day.
If you are fine with this option, I would be very glad if you would
give me access to edit FLIP page, so I could add it myself

2. Common table options
I also think that FactoryUtil would be overloaded by all cache
options. But maybe unify all suggested options, not only for default
cache? I.e. class 'LookupOptions', that unifies default cache options,
rescan options, 'async', 'maxRetries'. WDYT?

3. Retries
I'm fine with suggestion close to RetryUtils#tryTimes(times, call)

[1] 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-

Best regards,
Alexander

ср, 18 мая 2022 г. в 16:04, Qingsheng Ren :
>
> Hi Jark and Alexander,
>
> Thanks for your comments! I’m also OK to introduce common table options. I 
> prefer to introduce a new DefaultLookupCacheOptions class for holding these 
> option definitions because putting all options into FactoryUtil would make it 
> a bit ”crowded” and not well categorized.
>
> FLIP has been updated according to suggestions above:
> 1. Use static “of” method for constructing RescanRuntimeProvider considering 
> both arguments are required.
> 2. Introduce new table options matching DefaultLookupCacheFactory
>
> Best,
> Qingsheng
>
> On Wed, May 18, 2022 at 2:57 PM Jark Wu  wrote:
>>
>> Hi Alex,
>>
>> 1) retry logic
>> I think we can extract some common retry logic into utilities, e.g. 
>> RetryUtils#tryTimes(times, call).
>> This seems independent of this FLIP and can be reused by DataStream users.
>> Maybe we can open an issue to discuss this and where to put it.
>>
>> 2) cache ConfigOptions
>> I'm fine with defining cache config options in the framework.
>> A candidate place to put is FactoryUtil which also includes 
>> "sink.parallelism", "format" options.
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 18 May 2022 at 13:52, Александр Смирнов  wrote:
>>>
>>> Hi Qingsheng,
>>>
>>> Thank you for considering my comments.
>>>
>>> >  there might be custom logic before making retry, such as re-establish 
>>> > the connection
>>>
>>> Yes, I understand that. I meant that such logic can be placed in a
>>> separate function, that can be implemented by connectors. Just moving
>>> the retry logic would make connector's LookupFunction more concise +
>>> avoid duplicate code. However, it's a minor change. The decision is up
>>> to you.
>>>
>>> > We decide not to provide common DDL options and let developers to define 
>>> > their own options as we do now per connector.
>>>
>>> What is the reason for that? One of the main goals of this FLIP was to
>>> unify the configs, wasn't it? I understand that current cache design
>>> doesn't depend on ConfigOptions, like was before. But still we can put
>>> these options into the framework, so connectors can reuse them and
>>> avoid code duplication, and, what is more significant, avoid possible
>>> different options naming. This moment can be pointed out in
>>> documentation for connector developers.
>>>
>>> Best regards,
>>> Alexander
>>>
>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren :
>>> >
>>> > Hi Alexander,
>>> >
>>> > Thanks for the review and glad to see we are on the same page! I think 
>>> > you forgot to cc the dev mailing list so I’m also quoting your reply 
>>> > under this email.
>>> >
>>> > >  We can add 'maxRetryTimes' option into this class
>>> >
>>> > In my opinion the retry logic should be implemented in lookup() instead 
>>> > of in LookupFunction#eval(). Retrying is only meaningful under some 
>>> > specific retriable failures, and there might be custom logic before 
>>> > making retry, such as re-establish the connection 
>>> > (JdbcRowDataLookupFunction is an example), so it's more handy to leave it 
>>> > to the connector.
>>> >
>>> > > I don't see DDL options, that were in previous version of FLIP. Do you 
>>> > > have any special plans for them?
>>> >
>>> > We decide not to provide common DDL options and let developers to define 
>>> > their own options as we do now per connector.
>>> >
>>> > The rest of comments sound great and I’ll update the FLIP. Hope we can 
>>> > finalize our proposal soon!
>>> >
>>> > Best,

Re: [VOTE] FLIP-229: Introduces Join Hint for Flink SQL Batch Job

2022-05-19 Thread Timo Walther

+1 (binding)

Thanks for driving this!

Timo

On 19.05.22 08:44, Leonard Xu wrote:

  Thanks Xuyang for driving this work.

+1(binding)

Best,
Leonard
  


2022年5月19日 上午10:46,Yun Tang  写道:

Thanks for driving, +1 (binding)

Best
Yun Tang

From: Jark Wu 
Sent: Wednesday, May 18, 2022 23:09
To: dev 
Subject: Re: [VOTE] FLIP-229: Introduces Join Hint for Flink SQL Batch Job

+1(binding)

Best,
Jark

On Wed, 18 May 2022 at 14:18, Jingsong Li  wrote:


+1 Thanks for driving.

Best,
Jingsong

On Wed, May 18, 2022 at 1:33 PM godfrey he  wrote:


Thanks Xuyang for driving this, +1(binding)

Best,
Godfrey

Xuyang  于2022年5月17日周二 10:21写道:


Hi, everyone.
Thanks for your feedback for FLIP-229: Introduces Join Hint for Flink

SQL Batch Job[1] on the discussion thread[2].

I'd like to start a vote for it. The vote will be open for at least 72

hours unless there is an objection or not enough votes.


--

Best!
Xuyang


[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job

[2] https://lists.apache.org/thread/y668bxyjz66ggtjypfz9t571m0tyvv9h










[DISCUSS] FLINK-27625: Add a new query hint for async lookup join

2022-05-19 Thread Lincoln Lee
Hi everyone,

I would like to open a discussion on adding a new query hint for async
lookup join. Since the changes were relatively minor, no new flip was
created, and if needed, I will create one.

FLINK-27623 adds a global parameter 'table.exec.async-lookup.output-mode'
for table users so that all three control parameters related to async I/O
can be configured at the same job level.
As planned in the issue, we‘d like to go a step further to offer more
precise control for async join operation more than job level config,
to introduce a new join hint: ‘ASYNC_LOOKUP’.

For the hint option, for intuitive and user-friendly reasons, we want to
support both simple and kv forms, with all options except table name being
optional (use job level configuration if not set)

# 1. simple form: (ordered hint option list)
```
ASYNC_LOOKUP('tableName'[, 'output-mode', 'buffer-capacity', 'timeout'])
optional:
output-mode
buffer-capacity
timeout
```

Note: since Calcite currently does not support the mixed type hint options,
the table name here needs to be a string instead of an identifier. (For
`SqlHint`: The option format can not be mixed in, they should either be all
simple identifiers or all literals or all key value pairs.) We can improve
this after Calcite support.

# 2. kv form: (support unordered hint option list)
```
ASYNC_LOOKUP('table'='tableName'[, 'output-mode'='ordered|allow-unordered',
'capacity'='int', 'timeout'='duration'])

optional kvs:
'output-mode'='ordered|allow-unordered'
'capacity'='int'
'timeout'='duration'
```

e.g., if the job level configuration is:
```
table.exec.async-lookup.output-mode: ORDERED
table.exec.async-lookup.buffer-capacity: 100
table.exec.async-lookup.timeout: 180s
```

then the following hints:
```
1. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '300s')
2. ASYNC_LOOKUP('dim1', 'allow-unordered', '200')
3. ASYNC_LOOKUP('table'='dim1', 'output-mode'='allow-unordered')
4. ASYNC_LOOKUP('table'='dim1', 'timeout'='300s')
5. ASYNC_LOOKUP('table'='dim1', 'capacity'='300')
```

are equivalent to:
```
1. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '300s')
2. ASYNC_LOOKUP('dim1', 'allow-unordered', '200', '180s')
3. ASYNC_LOOKUP('table'='dim1', 'output-mode'='allow-unordered',
'capacity'='100', 'timeout'='180s')
4. ASYNC_LOOKUP('table'='dim1', 'output-mode'='ordered', 'capacity'='100',
'timeout'='300s')
5. ASYNC_LOOKUP('table'='dim1', 'output-mode'='ordered', 'capacity'='300',
'timeout'='180s')
```

In addition, if the lookup source implements both sync and async table
function, the planner prefers to choose the async function when the
'ASYNC_LOOKUP' hint is specified.

Looking forward to your feedback!

Best,
Lincoln Lee


Re: [DISCUSS] FLIP-91: Support SQL Client Gateway

2022-05-19 Thread Timo Walther

Hi Shengkai,

thanks again for the update. I don't have much to add:

> I still think we should use a new state machine

Thanks for the explanation about async/sync behavior. I thought we also 
want to use the Gateway for job status updates. But if FINISHED only 
refers to the job submission, the new state machine makes more sense. 
Nevertheless, checking the job status will be a frequent request esp. 
also for the new lifecycle statements. But I agree to the current design 
that this is different to state of the operation.


> SQLGatewayService.getFunction / UserDefinedFunctionInfo

You forgot to update the class. There is still a UserDefinedFunctionInfo 
in the FLIP.


> configure_session

I don't have a strong opinion on the naming of this particular REST 
path. But my concern is that we should introduce a term of this family 
of init/configure statements. Because in SQL Client we call it `init 
statements` and in the gateway we call it `configuration statements`, 
but in the end is all statements that are `not DML and DQL`.


> './sql-client.h -e'

Would it make sense to introduce a separate `./sql-gateway.sh`? I find 
it a bit confusing that an "-e" converts a client to a server. Under the 
hood we can still share the same classes but make it a bit more explicit 
in the distribution (also for marketing purposes of this feature).


Please continue with the voting afterwards.

Regards,
Timo

On 17.05.22 09:14, Shengkai Fang wrote:

Hi, all.

After discussing with Becket Qin offline, I modified the FLIP a little. The
change is as follow:

1. Add /api_versions in the REST API.

The api is used by the client to know the current REST Endpoint supports
which version. The client is able to choose the version for later
communication.

2. SqlClient uses -e option to input the endpoint address and port.

Because the -h option has already been used by the SqlClient. We use the
-e, --endpoint for SqlClient to input the address:port of the endpoint.

3. Use './sql-client.h -e' to start the gateway mode rather than
'/sql-client.h gateway -e'.

If the user specifies the -e option, it definitely means to use the gateway
mode. Therefore, it is redundant to use another keyword to indicate the
mode.

Best,
Shengkai

Shengkai Fang  于2022年5月17日周二 14:13写道:


Hi, Jark, Timo. Nice to have an agreement!

Thanks for Jark's inputs about the multiple version Flink. I have already
updated the FLIP in the rejected alternatives about details.

1. We should definitely just use LogicalTypeJsonSerializer and not a
second JSON representation.

Our concern is mainly that it's hard for users to use because of the
flexible structure. The LogicalTypeJsonSerializer will serialize the
VARCHAR to "VARCHAR()" or "{\"TYPE\": \"VARCHAR\", \"LENGTH\": 0}",
which requires the end users to process the different situations. But in
some cases, users just print the json to the terminal/web UI.  WDYT?


Serialize the RowData


Sure. I will keep your advice in mind. I think the current serialization
of the RowData will not use the column name as the Object key in the json.
I am not sure whether I missed something. It would be nice if you can give
me an example if I do something wrong.


Have you also thought about using Flink's state types from Flink

tasks/jobs?

Yes. But I still think we should use a new state machine. First of all,
Operation in the FLIP is much different from the Job. Operations include
DDL, DML and so on. So it's not suitable to use the small concept to
replace the big concept. Actually some status in the JobStatus, e.g.
RESTARTING/SUSPENDED/RECONCILING don't work in the DDL Operation.

On the other hand, the Gateway allows users to submit jobs(DML) in
sync/async mode. The running status in the Operation Status in the
different mode has different meaning:
- In the async mode, when the gateway submits the job, the state comes to
the FINISHED state
- In the sync mode, the running status in the Operation status includes
submitting the job, running job. Even if a failover occurs, we still think
that this Operation is in the RUNNING state. Unless the job is
unrecoverable, we change the Operation status to ERROR.

Therefore, I think these two concepts are not consistent and we should not
reuse the JobStatus. I add a section in the rejected alternatives.


Options to configure the REST endpoint


Yes. I have modified the FLIP about this.


Naming conversion


Yes. I have modified the FLIP with your suggestions.


Another smaller shortcomings in the FLIP



SQLGatewayService.getFunction / UserDefinedFunctionInfo


After reviewing the java.sql.DatabaseMetaData#getFunctions's java doc, I
find it will return the system and user functions available in the Catalog.
I think you are right. Therefore, we'd better to rename to the
listFunctions(SessionHandle sessionHandle, OperationHandle operationHandle,
String catalog, String database, ShowFunctionsOperation.FunctionScope) and
it returns FunctionInfo.



[jira] [Created] (FLINK-27696) Add bin-pack strategy to split the whole bucket data files into several small splits for append-only table.

2022-05-19 Thread Zheng Hu (Jira)
Zheng Hu created FLINK-27696:


 Summary: Add bin-pack strategy to split the whole bucket data 
files into several small splits for append-only table.
 Key: FLINK-27696
 URL: https://issues.apache.org/jira/browse/FLINK-27696
 Project: Flink
  Issue Type: Sub-task
Reporter: Zheng Hu


For append-only table,  we don't have to assign each task with a whole bucket 
data files. Instead,  we can use some algorithm ( such as bin-packing) to split 
the whole bucket data files into multiple fragments  to improve the job 
parallelism.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-221 Abstraction for lookup source cache and metric

2022-05-19 Thread Leonard Xu
Thanks Qingsheng and Alexander for the update. Current API and Options design 
of this FLIP look good enough from my side,.
If no more concerns about the thread, I think we can start a VOTE thread later.

Best,
Leonard


> 2022年5月18日 下午5:04,Qingsheng Ren  写道:
> 
> Hi Jark and Alexander,
> 
> Thanks for your comments! I’m also OK to introduce common table options. I
> prefer to introduce a new DefaultLookupCacheOptions class for holding these
> option definitions because putting all options into FactoryUtil would make
> it a bit ”crowded” and not well categorized.
> 
> FLIP has been updated according to suggestions above:
> 1. Use static “of” method for constructing RescanRuntimeProvider
> considering both arguments are required.
> 2. Introduce new table options matching DefaultLookupCacheFactory
> 
> Best,
> Qingsheng
> 
> On Wed, May 18, 2022 at 2:57 PM Jark Wu  wrote:
> 
>> Hi Alex,
>> 
>> 1) retry logic
>> I think we can extract some common retry logic into utilities, e.g.
>> RetryUtils#tryTimes(times, call).
>> This seems independent of this FLIP and can be reused by DataStream users.
>> Maybe we can open an issue to discuss this and where to put it.
>> 
>> 2) cache ConfigOptions
>> I'm fine with defining cache config options in the framework.
>> A candidate place to put is FactoryUtil which also includes
>> "sink.parallelism", "format" options.
>> 
>> Best,
>> Jark
>> 
>> 
>> On Wed, 18 May 2022 at 13:52, Александр Смирнов 
>> wrote:
>> 
>>> Hi Qingsheng,
>>> 
>>> Thank you for considering my comments.
>>> 
 there might be custom logic before making retry, such as re-establish
>>> the connection
>>> 
>>> Yes, I understand that. I meant that such logic can be placed in a
>>> separate function, that can be implemented by connectors. Just moving
>>> the retry logic would make connector's LookupFunction more concise +
>>> avoid duplicate code. However, it's a minor change. The decision is up
>>> to you.
>>> 
 We decide not to provide common DDL options and let developers to
>>> define their own options as we do now per connector.
>>> 
>>> What is the reason for that? One of the main goals of this FLIP was to
>>> unify the configs, wasn't it? I understand that current cache design
>>> doesn't depend on ConfigOptions, like was before. But still we can put
>>> these options into the framework, so connectors can reuse them and
>>> avoid code duplication, and, what is more significant, avoid possible
>>> different options naming. This moment can be pointed out in
>>> documentation for connector developers.
>>> 
>>> Best regards,
>>> Alexander
>>> 
>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren :
 
 Hi Alexander,
 
 Thanks for the review and glad to see we are on the same page! I think
>>> you forgot to cc the dev mailing list so I’m also quoting your reply under
>>> this email.
 
> We can add 'maxRetryTimes' option into this class
 
 In my opinion the retry logic should be implemented in lookup() instead
>>> of in LookupFunction#eval(). Retrying is only meaningful under some
>>> specific retriable failures, and there might be custom logic before making
>>> retry, such as re-establish the connection (JdbcRowDataLookupFunction is an
>>> example), so it's more handy to leave it to the connector.
 
> I don't see DDL options, that were in previous version of FLIP. Do
>>> you have any special plans for them?
 
 We decide not to provide common DDL options and let developers to
>>> define their own options as we do now per connector.
 
 The rest of comments sound great and I’ll update the FLIP. Hope we can
>>> finalize our proposal soon!
 
 Best,
 
 Qingsheng
 
 
> On May 17, 2022, at 13:46, Александр Смирнов 
>>> wrote:
> 
> Hi Qingsheng and devs!
> 
> I like the overall design of updated FLIP, however I have several
> suggestions and questions.
> 
> 1) Introducing LookupFunction as a subclass of TableFunction is a good
> idea. We can add 'maxRetryTimes' option into this class. 'eval' method
> of new LookupFunction is great for this purpose. The same is for
> 'async' case.
> 
> 2) There might be other configs in future, such as 'cacheMissingKey'
> in LookupFunctionProvider or 'rescanInterval' in ScanRuntimeProvider.
> Maybe use Builder pattern in LookupFunctionProvider and
> RescanRuntimeProvider for more flexibility (use one 'build' method
> instead of many 'of' methods in future)?
> 
> 3) What are the plans for existing TableFunctionProvider and
> AsyncTableFunctionProvider? I think they should be deprecated.
> 
> 4) Am I right that the current design does not assume usage of
> user-provided LookupCache in re-scanning? In this case, it is not very
> clear why do we need methods such as 'invalidate' or 'putAll' in
> LookupCache.
> 
> 5) I don't see DDL options, that were in previous version of FLIP. Do
> you have any 

Re: [VOTE] FLIP-229: Introduces Join Hint for Flink SQL Batch Job

2022-05-19 Thread Leonard Xu
 Thanks Xuyang for driving this work.

+1(binding)

Best,
Leonard
 

> 2022年5月19日 上午10:46,Yun Tang  写道:
> 
> Thanks for driving, +1 (binding)
> 
> Best
> Yun Tang
> 
> From: Jark Wu 
> Sent: Wednesday, May 18, 2022 23:09
> To: dev 
> Subject: Re: [VOTE] FLIP-229: Introduces Join Hint for Flink SQL Batch Job
> 
> +1(binding)
> 
> Best,
> Jark
> 
> On Wed, 18 May 2022 at 14:18, Jingsong Li  wrote:
> 
>> +1 Thanks for driving.
>> 
>> Best,
>> Jingsong
>> 
>> On Wed, May 18, 2022 at 1:33 PM godfrey he  wrote:
>> 
>>> Thanks Xuyang for driving this, +1(binding)
>>> 
>>> Best,
>>> Godfrey
>>> 
>>> Xuyang  于2022年5月17日周二 10:21写道:
 
 Hi, everyone.
 Thanks for your feedback for FLIP-229: Introduces Join Hint for Flink
>>> SQL Batch Job[1] on the discussion thread[2].
 I'd like to start a vote for it. The vote will be open for at least 72
>>> hours unless there is an objection or not enough votes.
 
 --
 
Best!
Xuyang
 
 
 [1]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job
 [2] https://lists.apache.org/thread/y668bxyjz66ggtjypfz9t571m0tyvv9h
>>> 
>> 



Re: [DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread Xintong Song
Thanks for preparing this FLIP, Weijie.

I think this is a good improvement on batch resource elasticity. Looking
forward to the community feedback.

Best,

Xintong



On Thu, May 19, 2022 at 2:31 PM weijie guo 
wrote:

> Hi all,
>
>
> I’d like to start a discussion about FLIP-235[1], which introduce a new 
> shuffle mode
>  can overcome some of the problems of Pipelined Shuffle and Blocking Shuffle 
> in batch scenarios.
>
>
> Currently in Flink, task scheduling is more or less constrained by the 
> shuffle implementations.
> This will bring the following disadvantages:
>
>1. Pipelined Shuffle:
> For pipelined shuffle, the upstream and downstream tasks are required to 
> be deployed at the same time, to avoid upstream tasks being blocked forever. 
> This is fine when there are enough resources for both upstream and downstream 
> tasks to run simultaneously, but will cause the following problems otherwise:
>1.
>   Pipelined shuffle connected tasks (i.e., a pipelined region) cannot be 
> executed until obtaining resources for all of them, resulting in longer job 
> finishing time and poorer resource efficiency due to holding part of the 
> resources idle while waiting for the rest.
>   2.
>   More severely, if multiple jobs each hold part of the cluster resources 
> and are waiting for more, a deadlock would occur. The chance is not trivial, 
> especially for scenarios such as OLAP where concurrent job submissions are 
> frequent.
>   2. Blocking Shuffle:
> For blocking shuffle, execution of downstream tasks must wait for all 
> upstream tasks to finish, despite there might be more resources available. 
> The sequential execution of upstream and downstream tasks significantly 
> increase the job finishing time, and the disk IO workload for spilling and 
> loading full intermediate data also affects the performance.
>
>
> We believe the root cause of the above problems is that shuffle 
> implementations put unnecessary constraints on task scheduling.
>
>
> To solve this problem, Xintong Song and I propose to introduce hybrid shuffle 
> to minimize the scheduling constraints. With Hybrid Shuffle, Flink should:
>
>1. Make best use of available resources.
> Ideally, we want Flink to always make progress if possible. That is to 
> say, it should always execute a pending task if there are resources available 
> for that task.
>2. Minimize disk IO load.
> In-flight data should be consumed directly from memory as much as 
> possible. Only data that is not consumed timely should be spilled to disk.
>
> You can find more details in FLIP-235. Looking forward to your feedback.
>
>
> [1]
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>
>
>
> Best regards,
>
> Weijie
>


[DISCUSS] FLIP-235: Hybrid Shuffle Mode

2022-05-19 Thread weijie guo
Hi all,

I’d like to start a discussion about FLIP-235[1], which introduce a
new shuffle mode
 can overcome some of the problems of Pipelined Shuffle and Blocking
Shuffle in batch scenarios.

Currently in Flink, task scheduling is more or less constrained by the
shuffle implementations.
This will bring the following disadvantages:

   1. Pipelined Shuffle:
For pipelined shuffle, the upstream and downstream tasks are
required to be deployed at the same time, to avoid upstream tasks
being blocked forever. This is fine when there are enough resources
for both upstream and downstream tasks to run simultaneously, but will
cause the following problems otherwise:
   1.
  Pipelined shuffle connected tasks (i.e., a pipelined region)
cannot be executed until obtaining resources for all of them,
resulting in longer job finishing time and poorer resource efficiency
due to holding part of the resources idle while waiting for the rest.
  2.
  More severely, if multiple jobs each hold part of the cluster
resources and are waiting for more, a deadlock would occur. The chance
is not trivial, especially for scenarios such as OLAP where concurrent
job submissions are frequent.
  2. Blocking Shuffle:
For blocking shuffle, execution of downstream tasks must wait for
all upstream tasks to finish, despite there might be more resources
available. The sequential execution of upstream and downstream tasks
significantly increase the job finishing time, and the disk IO
workload for spilling and loading full intermediate data also affects
the performance.

We believe the root cause of the above problems is that shuffle
implementations put unnecessary constraints on task scheduling.

To solve this problem, Xintong Song and I propose to introduce hybrid
shuffle to minimize the scheduling constraints. With Hybrid Shuffle,
Flink should:

   1. Make best use of available resources.
Ideally, we want Flink to always make progress if possible. That
is to say, it should always execute a pending task if there are
resources available for that task.
   2. Minimize disk IO load.
In-flight data should be consumed directly from memory as much as
possible. Only data that is not consumed timely should be spilled to
disk.

You can find more details in FLIP-235. Looking forward to your feedback.


[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode



Best regards,

Weijie


[jira] [Created] (FLINK-27694) Move lint-python log location

2022-05-19 Thread LuNng Wang (Jira)
LuNng Wang created FLINK-27694:
--

 Summary: Move lint-python log location
 Key: FLINK-27694
 URL: https://issues.apache.org/jira/browse/FLINK-27694
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.0
Reporter: LuNng Wang


Some logs are in the wrong location, we need to move them to inside of 'if'.

```

print_function "STEP" "installing wget..."
if [ $STEP -lt 1 ]; then
install_wget ${SUPPORT_OS[$os_index]}
STEP=1
checkpoint_stage $STAGE $STEP
fi
print_function "STEP" "install wget... [SUCCESS]"

```



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27695) KafkaTransactionLogITCase failed on azure due to Could not find a valid Docker environment

2022-05-19 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-27695:


 Summary: KafkaTransactionLogITCase failed on azure due to Could 
not find a valid Docker environment
 Key: FLINK-27695
 URL: https://issues.apache.org/jira/browse/FLINK-27695
 Project: Flink
  Issue Type: Bug
  Components: Build System / Azure Pipelines, Connectors / Kafka
Affects Versions: 1.14.4
Reporter: Huang Xingbo



{code:java}
022-05-19T02:04:23.9190098Z May 19 02:04:23 [ERROR] Tests run: 1, Failures: 0, 
Errors: 1, Skipped: 0, Time elapsed: 7.404 s <<< FAILURE! - in 
org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase
2022-05-19T02:04:23.9191182Z May 19 02:04:23 [ERROR] 
org.apache.flink.connector.kafka.sink.KafkaTransactionLogITCase  Time elapsed: 
7.404 s  <<< ERROR!
2022-05-19T02:04:23.9192250Z May 19 02:04:23 java.lang.IllegalStateException: 
Could not find a valid Docker environment. Please see logs and check 
configuration
2022-05-19T02:04:23.9193144Z May 19 02:04:23at 
org.testcontainers.dockerclient.DockerClientProviderStrategy.lambda$getFirstValidStrategy$4(DockerClientProviderStrategy.java:156)
2022-05-19T02:04:23.9194653Z May 19 02:04:23at 
java.util.Optional.orElseThrow(Optional.java:290)
2022-05-19T02:04:23.9196179Z May 19 02:04:23at 
org.testcontainers.dockerclient.DockerClientProviderStrategy.getFirstValidStrategy(DockerClientProviderStrategy.java:148)
2022-05-19T02:04:23.9197995Z May 19 02:04:23at 
org.testcontainers.DockerClientFactory.getOrInitializeStrategy(DockerClientFactory.java:146)
2022-05-19T02:04:23.9199486Z May 19 02:04:23at 
org.testcontainers.DockerClientFactory.client(DockerClientFactory.java:188)
2022-05-19T02:04:23.9200666Z May 19 02:04:23at 
org.testcontainers.DockerClientFactory$1.getDockerClient(DockerClientFactory.java:101)
2022-05-19T02:04:23.9202109Z May 19 02:04:23at 
com.github.dockerjava.api.DockerClientDelegate.authConfig(DockerClientDelegate.java:107)
2022-05-19T02:04:23.9203065Z May 19 02:04:23at 
org.testcontainers.containers.GenericContainer.start(GenericContainer.java:316)
2022-05-19T02:04:23.9204641Z May 19 02:04:23at 
org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066)
2022-05-19T02:04:23.9205765Z May 19 02:04:23at 
org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
2022-05-19T02:04:23.9206568Z May 19 02:04:23at 
org.junit.rules.RunRules.evaluate(RunRules.java:20)
2022-05-19T02:04:23.9207497Z May 19 02:04:23at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
2022-05-19T02:04:23.9208246Z May 19 02:04:23at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
2022-05-19T02:04:23.9208887Z May 19 02:04:23at 
org.junit.runner.JUnitCore.run(JUnitCore.java:137)
2022-05-19T02:04:23.9209691Z May 19 02:04:23at 
org.junit.runner.JUnitCore.run(JUnitCore.java:115)
2022-05-19T02:04:23.9210490Z May 19 02:04:23at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
2022-05-19T02:04:23.9211246Z May 19 02:04:23at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
2022-05-19T02:04:23.9211989Z May 19 02:04:23at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
2022-05-19T02:04:23.9212682Z May 19 02:04:23at 
java.util.Iterator.forEachRemaining(Iterator.java:116)
2022-05-19T02:04:23.9213391Z May 19 02:04:23at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
2022-05-19T02:04:23.9214305Z May 19 02:04:23at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
2022-05-19T02:04:23.9215044Z May 19 02:04:23at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
2022-05-19T02:04:23.9215809Z May 19 02:04:23at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
2022-05-19T02:04:23.9216576Z May 19 02:04:23at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
2022-05-19T02:04:23.9217523Z May 19 02:04:23at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
2022-05-19T02:04:23.9218275Z May 19 02:04:23at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
2022-05-19T02:04:23.9219099Z May 19 02:04:23at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
2022-05-19T02:04:23.9220028Z May 19 02:04:23at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
2022-05-19T02:04:23.9220795Z May 19 02:04:23at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
2022-05-19T02:04:23.9221598Z May 19 02:04:23at 
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
2022-05-19T02:04:23.9222614Z May 19 02:04:23at 

[jira] [Created] (FLINK-27693) Support local recovery for non-materialized part(write, restore, discard)

2022-05-19 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-27693:
--

 Summary: Support local recovery for non-materialized part(write, 
restore, discard)
 Key: FLINK-27693
 URL: https://issues.apache.org/jira/browse/FLINK-27693
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.16.0
Reporter: Yanfei Lei
 Fix For: 1.16.0


Support local recovery for non-materialized part(write, restore, discard)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27692) Support local recovery for materialized part(write, restore, discard)

2022-05-19 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-27692:
--

 Summary: Support local recovery for materialized part(write, 
restore, discard)
 Key: FLINK-27692
 URL: https://issues.apache.org/jira/browse/FLINK-27692
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.16.0
Reporter: Yanfei Lei
 Fix For: 1.16.0


Support local recovery for materialized part(write, restore, discard)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: Flink Kubernetes operator not having a scale subresource

2022-05-19 Thread Gyula Fóra
Hi Team!

This is probably something for after the release but I created a simple
prototype for the scaling subresource based on taskmanager replica count.

You can take a look here:
https://github.com/apache/flink-kubernetes-operator/pull/227

After some consideration I decided against using parallelism and used tm
replicas instead (still with native integration), I describe this in the PR.

I will leave the PR open so people can experiment/comment and we should
definitely get back to this after the 1.0.0 release because it seems to be
a very lightweight yet useful feature.

Cheers,
Gyula


On Sat, May 7, 2022 at 11:25 AM Gyula Fóra  wrote:

> Hi Jay!
>
> I will take a closer look into this and see if we can use the parallelism
> in the scale subresource.
>
> If you could experiment with this and see if it works with the current CRD
> that would be helpful . Not sure if we need to change the status or
> anything as parallelism is only part of the spec at the moment.
>
> If you have a working modified CRD I would appreciate if you could share
> it with us!
>
> Don’t worry about the release schedule, if we think that this is important
> and we need some changes for it , we can push the release out a few days if
> necessary.
>
> What is important at this point to understand what exactly we need to make
> the parallelism scaling work natively to avoid breaking changes to the
> spec/status after the release :)
>
> Cheers
> Gyula
>
> On Sat, 7 May 2022 at 11:14, Jay Ghiya  wrote:
>
>> Hi Team,
>>
>> Yes we can change the parallelism of flink job. So going through the
>> roadmap , what I understand that we have put the standalone mode as second
>> priority due to right reasons. So , if possible can I be of any help to
>> accelerate this as we have a tight release schedule so would want to close
>> this in next 10 days with your guys’ help.
>>
>> Looking forward to hear from you !
>>
>> -Jay
>>
>> Sent with a Spark 
>> On 7 May 2022, 8:15 AM +0530, Yang Wang , wrote:
>>
>> Currently, the flink-kubernetes-operator is using Flink native K8s
>> integration[1], which means Flink ResourceManager will dynamically allocate
>> TaskManager on demand.
>> So the users do not need to specify the replicas of TaskManager.
>>
>> Just like Gyula said, one possible solution to make "kubectl scale" work
>> is to change the parallelism of Flink job.
>>
>> If the standalone mode[2] is introduced in the operator, then it is also
>> possible to directly change the replicas of TaskManager pods.
>>
>>
>> [1].
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/
>> [2].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator
>>
>> Best,
>> Yang
>>
>> Gyula Fóra  于2022年5月7日周六 04:26写道:
>>
>>> Hi Jay!
>>>
>>> Interesting question/proposal to add the scale-subresource.
>>>
>>> I am not an expert on this area but we will look into this a little and
>>> give you some feedback and see if we can incorporate something into the
>>> upcoming release if it makes sense.
>>>
>>> On a high level there is not a single replicas value for a
>>> FlinkDeployment that would be easy to map, but maybe we could use the
>>> parallelism value for this purpose for Applications/Session jobs.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Fri, May 6, 2022 at 8:04 PM Jay Ghiya  wrote:
>>>
  Hi Team,


 I have been experimenting the Flink Kubernetes operator. One of the
 biggest miss that we have is it does not support scale sub resource as of
 now to support reactive scaling. Without that commercially it becomes very
 difficult for products like us who have very varied loads for every hour.



 Can I get some direction on the same to contribute on
 https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource
  for
 our Kubernetes operator crd?

 I have been a hard time reading -> 
 *https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
 
  to
 figure out the replicas, status,label selector json path of task
 manager? It may be due to lack of my knowledge so sense of direction will
 help me.*

 *-Jay*
 *GEHC*

>>>


[jira] [Created] (FLINK-27691) RankHarnessTest. testUpdateRankWithRowNumberSortKeyDropsToNotLast test failed with result mismatch

2022-05-19 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-27691:


 Summary: RankHarnessTest. 
testUpdateRankWithRowNumberSortKeyDropsToNotLast test failed with result 
mismatch
 Key: FLINK-27691
 URL: https://issues.apache.org/jira/browse/FLINK-27691
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-05-19T04:34:04.2677138Z May 19 04:34:04 [ERROR] 
RankHarnessTest.testUpdateRankWithRowNumberSortKeyDropsToNotLast
2022-05-19T04:34:04.2689553Z May 19 04:34:04 [ERROR]   Run 1: [result mismatch] 
2022-05-19T04:34:04.2690614Z May 19 04:34:04 expected: [+I(a,1,100,1),
2022-05-19T04:34:04.2691128Z May 19 04:34:04 +I(b,1,90,2),
2022-05-19T04:34:04.2691552Z May 19 04:34:04 +I(c,1,90,3),
2022-05-19T04:34:04.2692235Z May 19 04:34:04 +I(d,1,80,4),
2022-05-19T04:34:04.2692634Z May 19 04:34:04 +I(e,1,80,5),
2022-05-19T04:34:04.2693060Z May 19 04:34:04 +I(f,1,70,6),
2022-05-19T04:34:04.2693468Z May 19 04:34:04 +U(b,1,80,5),
2022-05-19T04:34:04.2693874Z May 19 04:34:04 +U(c,1,90,2),
2022-05-19T04:34:04.2694282Z May 19 04:34:04 +U(d,1,80,3),
2022-05-19T04:34:04.2694670Z May 19 04:34:04 +U(e,1,80,4),
2022-05-19T04:34:04.2696097Z May 19 04:34:04 -U(b,1,90,2),
2022-05-19T04:34:04.2696718Z May 19 04:34:04 -U(c,1,90,3),
2022-05-19T04:34:04.2697298Z May 19 04:34:04 -U(d,1,80,4),
2022-05-19T04:34:04.2698102Z May 19 04:34:04 -U(e,1,80,5)]
2022-05-19T04:34:04.2698758Z May 19 04:34:04  but was: [+I(a,1,100,1),
2022-05-19T04:34:04.2699189Z May 19 04:34:04 +I(b,1,90,1),
2022-05-19T04:34:04.2699607Z May 19 04:34:04 +I(c,1,90,2),
2022-05-19T04:34:04.2700017Z May 19 04:34:04 +I(d,1,80,3),
2022-05-19T04:34:04.2712164Z May 19 04:34:04 +I(e,1,80,4),
2022-05-19T04:34:04.2712777Z May 19 04:34:04 +I(f,1,70,5),
2022-05-19T04:34:04.2713191Z May 19 04:34:04 +U(b,1,80,4),
2022-05-19T04:34:04.2713621Z May 19 04:34:04 +U(c,1,90,1),
2022-05-19T04:34:04.2714029Z May 19 04:34:04 +U(d,1,80,2),
2022-05-19T04:34:04.2714435Z May 19 04:34:04 +U(e,1,80,3),
2022-05-19T04:34:04.2715272Z May 19 04:34:04 -U(b,1,90,1),
2022-05-19T04:34:04.2715847Z May 19 04:34:04 -U(c,1,90,2),
2022-05-19T04:34:04.2716420Z May 19 04:34:04 -U(d,1,80,3),
2022-05-19T04:34:04.2716990Z May 19 04:34:04 -U(e,1,80,4)]
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=35815=logs=a9db68b9-a7e0-54b6-0f98-010e0aff39e2=cdd32e0b-6047-565b-c58f-14054472f1be=10445




--
This message was sent by Atlassian Jira
(v8.20.7#820007)