Re: [VOTE] FLIP-380: Support Full Partition Processing On Non-keyed DataStream

2023-12-19 Thread Yingjie Cao
+1 (binding)

Best,
Yingjie

Xintong Song  于2023年12月18日周一 15:27写道:

> +1 (binding)
>
>
> Best,
>
> Xintong
>
>
>
> On Fri, Dec 15, 2023 at 5:15 PM weijie guo 
> wrote:
>
> > +1 (binding)
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Wencong Liu  于2023年12月15日周五 09:15写道:
> >
> > > Hi dev,
> > >
> > > I'd like to start a vote on FLIP-380.
> > >
> > > Discussion thread:
> > > https://lists.apache.org/thread/nn7myj7vsvytbkdrnbvj5h0homsjrn1h
> > > FLIP:
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-380%3A+Support+Full+Partition+Processing+On+Non-keyed+DataStream
> > >
> > > Best regards,
> > > Wencong Liu
> >
>


Re: [ANNOUNCE] Release 1.17.0, release candidate #1

2023-03-10 Thread Yingjie Cao
Hi Martijn,

Thanks a lot.
Fix has be merged.

Best regards,
Yingjie

Martijn Visser  于2023年3月10日周五 03:32写道:

> Hi Yingjie,
>
> Thanks for the test and identifying the issue, this is super helpful!
>
> To all others, please continue your testing on this RC so that if there are
> more blockers to be found, we can fix them with the next RC and have
> (hopefully) a successful vote on it.
>
> Best regards,
>
> Martijn
>
> On Thu, Mar 9, 2023 at 4:54 PM Yingjie Cao 
> wrote:
>
> > Hi community and release managers:
> >
> > When testing the release candidate #1 for batch scenario, I found a
> > potential deadlock issue of blocking shuffle. I have created a ticket [1]
> > for it and marked it as blocker. I will fix it no later than tomorrow.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-31386
> >
> > Best regards,
> > Yingjie
> >
> > Qingsheng Ren  于2023年3月9日周四 13:51写道:
> >
> > > Hi everyone,
> > >
> > > The RC1 for Apache Flink 1.17.0 has been created. This RC currently is
> > for
> > > preview only to facilitate the integrated testing since the release
> > > announcement is still under review. The voting process will be
> triggered
> > > once the announcement is ready. It has all the artifacts that we would
> > > typically have for a release, except for the release note and the
> website
> > > pull request for the release announcement.
> > >
> > > The following contents are available for your review:
> > >
> > > - The preview source release and binary convenience releases [1], which
> > > are signed with the key with fingerprint A1BD477F79D036D2C30C [2].
> > > - all artifacts that would normally be deployed to the Maven
> > > Central Repository [3].
> > > - source code tag "release-1.17.0-rc1" [4]
> > >
> > > Your help testing the release will be greatly appreciated! And we'll
> > > create the voting thread as soon as all the efforts are finished.
> > >
> > > [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.17.0-rc1
> > > [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > [3]
> > https://repository.apache.org/content/repositories/orgapacheflink-1591
> > > [4] https://github.com/apache/flink/releases/tag/release-1.17.0-rc1
> > >
> > > Best regards,
> > > Qingsheng, Leonard, Matthias and Martijn
> > >
> >
>


Re: [ANNOUNCE] Release 1.17.0, release candidate #1

2023-03-09 Thread Yingjie Cao
Hi community and release managers:

When testing the release candidate #1 for batch scenario, I found a
potential deadlock issue of blocking shuffle. I have created a ticket [1]
for it and marked it as blocker. I will fix it no later than tomorrow.

[1] https://issues.apache.org/jira/browse/FLINK-31386

Best regards,
Yingjie

Qingsheng Ren  于2023年3月9日周四 13:51写道:

> Hi everyone,
>
> The RC1 for Apache Flink 1.17.0 has been created. This RC currently is for
> preview only to facilitate the integrated testing since the release
> announcement is still under review. The voting process will be triggered
> once the announcement is ready. It has all the artifacts that we would
> typically have for a release, except for the release note and the website
> pull request for the release announcement.
>
> The following contents are available for your review:
>
> - The preview source release and binary convenience releases [1], which
> are signed with the key with fingerprint A1BD477F79D036D2C30C [2].
> - all artifacts that would normally be deployed to the Maven
> Central Repository [3].
> - source code tag "release-1.17.0-rc1" [4]
>
> Your help testing the release will be greatly appreciated! And we'll
> create the voting thread as soon as all the efforts are finished.
>
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.17.0-rc1
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> [3] https://repository.apache.org/content/repositories/orgapacheflink-1591
> [4] https://github.com/apache/flink/releases/tag/release-1.17.0-rc1
>
> Best regards,
> Qingsheng, Leonard, Matthias and Martijn
>


[jira] [Created] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-03-09 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-31386:
---

 Summary: Fix the potential deadlock issue of blocking shuffle
 Key: FLINK-31386
 URL: https://issues.apache.org/jira/browse/FLINK-31386
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


Currently, the SortMergeResultPartition may allocate more network buffers than 
the guaranteed size of the LocalBufferPool. As a result, some result partitions 
may need to wait other result partitions to release the over-allocated network 
buffers to continue. However, the result partitions which have allocated more 
than guaranteed buffers relies on the processing of input data to trigger data 
spilling and buffer recycling. The input data further relies on batch reading 
buffers used by the SortMergeResultPartitionReadScheduler which may already 
taken by those blocked result partitions which are waiting for buffers. Then 
deadlock occurs. We can easily fix this deadlock by reserving the guaranteed 
buffers on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29351) Enable input buffer floating for blocking shuffle

2022-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-29351:
---

 Summary: Enable input buffer floating for blocking shuffle
 Key: FLINK-29351
 URL: https://issues.apache.org/jira/browse/FLINK-29351
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


At input gate, Flink needs exclusive buffers for each input channel. For large 
parallelism jobs, it is easy to cause "Insufficient number of network buffers" 
error. This ticket aims to make all input network buffers floating for blocking 
shuffle to reduce the possibility of "Insufficient number of network buffers" 
error. This change can also improve the default blocking shuffle performance 
because buffer floating can increase the buffer utilization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29299) Fix the network memory size calculation issue in fine-grained resource mode

2022-09-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-29299:
---

 Summary: Fix the network memory size calculation issue in 
fine-grained resource mode
 Key: FLINK-29299
 URL: https://issues.apache.org/jira/browse/FLINK-29299
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.16.0
Reporter: Yingjie Cao
 Fix For: 1.16.0


After FLINK-28663, one intermediate dataset can be consumed by multiple 
consumers, there is a case where one vertex can consume one intermediate 
dataset multiple times. However, currently in fine-grained resource mode, when 
computing the required network buffer size, the intermediate dataset is used as 
key to record the size of network buffer per input gate, which means it may 
allocate less network buffers than needed if two input gate of the same vertex 
consumes the same intermediate dataset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28663) Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

2022-07-25 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28663:
---

 Summary: Allow multiple downstream consumer job vertices sharing 
the same intermediate dataset at scheduler side
 Key: FLINK-28663
 URL: https://issues.apache.org/jira/browse/FLINK-28663
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Yingjie Cao


Currently, one intermediate dataset can only be consumed by one downstream 
consumer vertex. If there are multiple consumer vertices consuming the same 
output of the same upstream vertex, multiple intermediate datasets will be 
produced. We can optimize this behavior to produce only one intermediate 
dataset which can be shared by multiple consumer vertices. As the first step, 
we should allow multiple downstream consumer job vertices sharing the same 
intermediate dataset at scheduler side. (Note that this optimization only works 
for blocking shuffle because pipelined shuffle result partition can not be 
consumed multiple times)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28561) Merge subpartition shuffle data read request for better sequential IO

2022-07-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28561:
---

 Summary: Merge subpartition shuffle data read request for better 
sequential IO
 Key: FLINK-28561
 URL: https://issues.apache.org/jira/browse/FLINK-28561
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


Currently, the shuffle data of each subpartition for blocking shuffle is read 
separately. To achieve better performance and reduce IOPS, we can merge 
consecutive data requests of the same field together and serves them in one IO 
request. More specifically,

1) if multiple data requests are reading the same data, for example, reading 
broadcast data, the reader will read the data only once and send the same piece 
of data to multiple downstream consumers.

2) if multiple data requests are reading the consecutive data in one file, we 
will merge those data requests together as one large request and read a larger 
size of data sequentially which is good for file IO performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28556) Extract header fields of Buffer into a BufferHeader class for blocking shuffle file IO

2022-07-14 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28556:
---

 Summary: Extract header fields of Buffer into a BufferHeader class 
for blocking shuffle file IO
 Key: FLINK-28556
 URL: https://issues.apache.org/jira/browse/FLINK-28556
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


This is a small code refactor which also can be reused by following PRs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28551) Store the number of bytes instead of the number of buffers in index entry for sort-shuffle

2022-07-14 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28551:
---

 Summary: Store the number of bytes instead of the number of 
buffers in index entry for sort-shuffle
 Key: FLINK-28551
 URL: https://issues.apache.org/jira/browse/FLINK-28551
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, in each index entry of sort-shuffle index file, one filed is the 
number of buffers in the current data region. The problem is that it is hard to 
know the data boundary before reading the file, to solve the problem, we can 
store the number of bytes instead of the number of buffers in index entry. 
Based on this change, we can do some optimization, for example, read larger 
size of data than a buffer for better sequential IO like what's mentioned in 
FLINK-28373.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28550) Remove the unused field in SortMergeSubpartitionReader

2022-07-14 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28550:
---

 Summary: Remove the unused field in SortMergeSubpartitionReader
 Key: FLINK-28550
 URL: https://issues.apache.org/jira/browse/FLINK-28550
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28519) Fix the bug that SortMergeResultPartitionReadScheduler may not read data sequentially

2022-07-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28519:
---

 Summary: Fix the bug that SortMergeResultPartitionReadScheduler 
may not read data sequentially
 Key: FLINK-28519
 URL: https://issues.apache.org/jira/browse/FLINK-28519
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, the SortMergeResultPartitionReadScheduler always gets all active 
subpartition readers and read at most one data region for them. It is common 
that some subpartitions are requested before others and their region indexes 
are ahead of others. If all region data of a subpartition can be read in one 
round, some subpartition readers will always ahead of others which will cause 
random IO. This patch fixes this case by polling one subpartition reader at a 
time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28514) Remove data flush in SortMergeResultPartition

2022-07-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28514:
---

 Summary: Remove data flush in SortMergeResultPartition
 Key: FLINK-28514
 URL: https://issues.apache.org/jira/browse/FLINK-28514
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


This patch aims to remove the data flush in SortMergeResultPartition because it 
is useless.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28512) Select HashBasedDataBuffer and SortBasedDataBuffer dynamically based on the number of network buffers can be allocated for

2022-07-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28512:
---

 Summary: Select HashBasedDataBuffer and SortBasedDataBuffer 
dynamically based on the number of network buffers can be allocated for
 Key: FLINK-28512
 URL: https://issues.apache.org/jira/browse/FLINK-28512
 Project: Flink
  Issue Type: Sub-task
Reporter: Yingjie Cao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28382) Introduce new compression algorithms of higher compression ratio

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28382:
---

 Summary: Introduce new compression algorithms of higher 
compression ratio
 Key: FLINK-28382
 URL: https://issues.apache.org/jira/browse/FLINK-28382
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, we use lz4 for shuffle data compression which is a good balance 
between IO optimization and CPU consumption. But for some scenarios, the IO 
becomes bottleneck and the storage space is limited (especially for k8s 
environment). For these cases, we need compression algorithms of higher 
compression ratio to further reduce IO.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28380) Produce one intermediate dataset for multiple consumers consuming the same data

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28380:
---

 Summary: Produce one intermediate dataset for multiple consumers 
consuming the same data
 Key: FLINK-28380
 URL: https://issues.apache.org/jira/browse/FLINK-28380
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission, Runtime / Coordination, Runtime 
/ Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, if one output of an upstream job vertex is consumed by multiple 
downstream job vertices, the upstream vertex will produce multiple dataset. For 
blocking shuffle, it means serialize and persist the same data multiple times. 
This ticket aims to optimize this behavior and make the upstream job vertex 
produce one dataset which will be read by multiple downstream vertex.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28378) Use larger data reading buffer size for sort-shuffle

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28378:
---

 Summary: Use larger data reading buffer size for sort-shuffle
 Key: FLINK-28378
 URL: https://issues.apache.org/jira/browse/FLINK-28378
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, for sort shuffle, we always use the network buffer size as the data 
reading buffer size which is 32K by default. We can increase this buffer size 
for better performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28377) Enable to request less number of data reading buffers for sort-shuffle if there is no enough data

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28377:
---

 Summary: Enable to request less number of data reading buffers for 
sort-shuffle if there is no enough data
 Key: FLINK-28377
 URL: https://issues.apache.org/jira/browse/FLINK-28377
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, for sort blocking shuffle, the corresponding data reader always 
allocate a fixed size of buffers for shuffle data reading even when there is no 
much data to read which will cause buffer waste. We can optimize this to 
allocate less number of reading buffers if there is no enough data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28376) Reduce the number of IO threads for sort-shuffle

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28376:
---

 Summary: Reduce the number of IO threads for sort-shuffle
 Key: FLINK-28376
 URL: https://issues.apache.org/jira/browse/FLINK-28376
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, the number of IO threads for shuffle data reading is relevant to the 
size of reading memory and the number of CPU cores. We should also consider the 
number of slots and the number of disks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28374) Some further improvements of sort-shuffle

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28374:
---

 Summary: Some further improvements of sort-shuffle
 Key: FLINK-28374
 URL: https://issues.apache.org/jira/browse/FLINK-28374
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


This is an umbrella issue for sort-shuffle Improvements.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28373) Read larger size of data sequentially for sort-shuffle

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28373:
---

 Summary: Read larger size of data sequentially for sort-shuffle
 Key: FLINK-28373
 URL: https://issues.apache.org/jira/browse/FLINK-28373
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, for sort blocking shuffle, the corresponding data readers read 
shuffle data in buffer granularity. Before compression, each buffer is 32K by 
default, after compression the size will become smaller (may less than 10K). 
For file IO, this is pretty smaller. To achieve better performance and reduce 
IOPS, we can merge consecutive data requests of the same field together and 
serves them in one IO request. More specifically,

1) if multiple data requests are reading the same data, for example, reading 
broadcast data, the reader will read the data only once and send the same piece 
of data to multiple downstream consumers.

2) if multiple data requests are reading the consecutive data in one file, we 
will merge those data requests together as one large request and read a larger 
size of data sequentially which is good for file IO performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-235: Hybrid Shuffle Mode

2022-05-31 Thread Yingjie Cao
+1

Best,
Yingjie

Jing Ge  于2022年5月30日周一 05:02写道:

> +1 (non-binding)
>
> Best Regards,
> Jing
>
> On Sun, May 29, 2022 at 5:16 AM Aitozi  wrote:
>
> > +1 (non-binding)
> >
> > Best,
> > Aitozi
> >
> > Yangze Guo  于2022年5月27日周五 11:17写道:
> >
> > > +1
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > On Thu, May 26, 2022 at 4:42 PM Xintong Song 
> > > wrote:
> > > >
> > > > +1
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Thu, May 26, 2022 at 3:47 PM weijie guo <
> guoweijieres...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Thanks for the feedback for FLIP-235: Hybrid Shuffle Mode[1] on the
> > > > > discussion thread [2]
> > > > >
> > > > > I'd like to start a vote for it. The vote will last for at least 72
> > > hours
> > > > > unless there is an objection or insufficient votes.
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > > > [2]
> https://lists.apache.org/thread/hfwpcs54sm5gp3mhv7s3lr79jywo3kv4
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Weijie
> > > > >
> > >
> >
>


[jira] [Created] (FLINK-26347) Should use Flink system Classloader (AppClassloader) when deserializing RPC message

2022-02-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-26347:
---

 Summary: Should use Flink system Classloader (AppClassloader) when 
deserializing RPC message
 Key: FLINK-26347
 URL: https://issues.apache.org/jira/browse/FLINK-26347
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
Reporter: Yingjie Cao
 Fix For: 1.15.0


FLINK-25742 removed the redundant serialization of RPC invocation at Flink 
side. However, by accident, which changes the class loading behavior. Before 
FLINK-25742, Flink system Classloader is used to load RPC message class, but 
after FLINK-25742, the RpcSystem Classloader (its parent Classloader is not 
Flink system Classloader) is used which can cause ClassNotFoundException. I 
encountered this exception when trying to run flink-remote-shuffle on the 
latest Flink 1.15-SNAPSHOT, the remote shuffle class (shuffle descriptor class) 
can not be found even when the corresponding jar file is in Flink lib/ 
directory.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25960) Distribute the data read buffers more fairly among result partitions for sort-shuffle

2022-02-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25960:
---

 Summary: Distribute the data read buffers more fairly among result 
partitions for sort-shuffle
 Key: FLINK-25960
 URL: https://issues.apache.org/jira/browse/FLINK-25960
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, the data read buffers for sort-shuffle are allocated in a random way 
and some result partitions may occupy too many buffers which leads to the 
starvation of other result partitions. This ticket aims to improve the scenario 
by not reading data for those result partitions which already occupy more than 
the average number of read buffers per result partition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25959) Add a micro-benchmark for the sort-based blocking shuffle

2022-02-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25959:
---

 Summary: Add a micro-benchmark for the sort-based blocking shuffle
 Key: FLINK-25959
 URL: https://issues.apache.org/jira/browse/FLINK-25959
 Project: Flink
  Issue Type: Sub-task
Reporter: Yingjie Cao






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25860) Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking main thread

2022-01-27 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25860:
---

 Summary: Move read buffer allocation and output file creation to 
setup method for sort-shuffle result partition to avoid blocking main thread
 Key: FLINK-25860
 URL: https://issues.apache.org/jira/browse/FLINK-25860
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, the read buffer allocation and output file creation of sort-shuffle 
is performed by the main thread. These operations are a little heavy and can 
block the main thread for a while which may influence other RPC calls including 
follow-up task deployment. This ticket aims to solve the issue by moving read 
buffer allocation and output file creation to setup method. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25796) Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25796:
---

 Summary: Avoid record copy for result partition of sort-shuffle if 
there are enough buffers for better performance
 Key: FLINK-25796
 URL: https://issues.apache.org/jira/browse/FLINK-25796
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, for result partition of sort-shuffle, there is extra record copy 
overhead Introduced by clustering records by subpartition index. For small 
records, this overhead can cause even 20% performance regression. This ticket 
aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of 
sorting records by partition index. However, it incurs some serious weaknesses. 
For example, when there is no enough buffers or there is data skew, it can 
waste buffers and influence compression efficiency which can cause performance 
regression.

This ticket tries to solve the issue by dynamically switching between the two 
implementations, that is, if there are enough buffers, the hash-based 
implementation will be used and if there is no enough buffers, the sort-based 
implementation will be used.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25786) Adjust the generation of subpartition data storage order for sort-shuffle from random shuffle to random shift

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25786:
---

 Summary: Adjust the generation of subpartition data storage order 
for sort-shuffle from random shuffle to random shift
 Key: FLINK-25786
 URL: https://issues.apache.org/jira/browse/FLINK-25786
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, for sort-shuffle the generation of subpartition data storage order  
is random shuffle. However, if there is no enough resources to run the 
downstream consumer tasks in parallel, the performance can be influenced 
because of the random disk IO caused by the random subpartition data storage 
order. This ticket aims to improve this scenario.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25781) Adjust the maximum number of buffers can be used per result partition for shuffle data read

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25781:
---

 Summary: Adjust the maximum number of buffers can be used per 
result partition for shuffle data read
 Key: FLINK-25781
 URL: https://issues.apache.org/jira/browse/FLINK-25781
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


In the current sort-shuffle implementation, the maximum number of buffers can 
be used per result partition for shuffle data read is 32M. However, for large 
parallelism jobs, 32M is not enough and for small parallelism jobs, 32M may 
waste buffers. This ticket aims to adjust the maximum number of buffers can be 
used per result partition to let it adjust according to parallelism and the 
selected value is an empirical one based on the TPC-DS test results.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25780) Reduce the maximum number of data output buffers for sort-shuffle

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25780:
---

 Summary: Reduce the maximum number of data output buffers for 
sort-shuffle
 Key: FLINK-25780
 URL: https://issues.apache.org/jira/browse/FLINK-25780
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


The data output buffer of sort-shuffle is for better disk IO performance and 
currently, the total data output buffer size is 16M which is pretty big. 
However, blocking request too many buffers may influence performance. This 
ticket aims to reduce the maximum number of data output buffers to reduce the 
buffer request time. The selected value is an empirical one based on the TPC-DS 
test results.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25774) Restrict the maximum number of buffers can be used per result partition for blocking shuffle

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25774:
---

 Summary: Restrict the maximum number of buffers can be used per 
result partition for blocking shuffle
 Key: FLINK-25774
 URL: https://issues.apache.org/jira/browse/FLINK-25774
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, for blocking shuffle, the maximum number of buffers can be used per 
result partition is Integer.MAX_VALUE. For hash-shuffle, the maximum number of 
buffers to be used is (numSubpartition + 1), because the hash-shuffle 
implementation always flush the previous buffer after a new buffer is added, so 
setting the maximum number of buffers can be used to Integer.MAX_VALUE is 
meaningless. For sort-shuffle, if too many buffers are taken by one result 
partition, other result partitions and input gates may spend too much time 
waiting for buffers which can influence performance. This ticket aims to 
restrict the maximum number of buffers can be used per result partition and the 
selected value is an empirical one based on the TPC-DS test results.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25654) Remove the redundant lock in SortMergeResultPartition

2022-01-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25654:
---

 Summary: Remove the redundant lock in SortMergeResultPartition
 Key: FLINK-25654
 URL: https://issues.apache.org/jira/browse/FLINK-25654
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.3
Reporter: Yingjie Cao
 Fix For: 1.15.0, 1.14.4


After FLINK-2372, the task canceler will never call the close method of 
ResultPartition, this can reduce some race conditions and simplify the code. 
This ticket aims to remove some redundant locks in SortMergeResultPartition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25653) Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock

2022-01-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25653:
---

 Summary: Move buffer recycle in SortMergeSubpartitionReader out of 
lock to avoid deadlock
 Key: FLINK-25653
 URL: https://issues.apache.org/jira/browse/FLINK-25653
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.13.5, 1.14.3
Reporter: Yingjie Cao
 Fix For: 1.15.0, 1.13.6, 1.14.4


For the current sort-shuffle implementation, the different lock order in 
SortMergeSubpartitionReader and SortMergeResultPartitionReadScheduler can cause 
deadlock. To solve the problem, we can move buffer recycle in 
SortMergeSubpartitionReader out of the lock.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25640) Enhance the document for blocking shuffle

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25640:
---

 Summary: Enhance the document for blocking shuffle
 Key: FLINK-25640
 URL: https://issues.apache.org/jira/browse/FLINK-25640
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Yingjie Cao
 Fix For: 1.15.0


As discussed in 
[https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket 
aims to enhance the document for blocking shuffle and add more operation 
guidelines.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25639) Increase the default read buffer size of sort-shuffle to 64M

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25639:
---

 Summary: Increase the default read buffer size of sort-shuffle to 
64M
 Key: FLINK-25639
 URL: https://issues.apache.org/jira/browse/FLINK-25639
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


As discussed in 
[https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket 
aims to increase the default read buffer size of sort-shuffle to 64M.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25638) Increase the default write buffer size of sort-shuffle to 16M

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25638:
---

 Summary: Increase the default write buffer size of sort-shuffle to 
16M
 Key: FLINK-25638
 URL: https://issues.apache.org/jira/browse/FLINK-25638
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


As discussed in 
[https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket 
aims to increase the default write buffer size of sort-shuffle to 16M.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25637) Make sort-shuffle the default shuffle implementation for batch jobs

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25637:
---

 Summary: Make sort-shuffle the default shuffle implementation for 
batch jobs
 Key: FLINK-25637
 URL: https://issues.apache.org/jira/browse/FLINK-25637
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


As discussed in 
[https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket 
aims to make sort-shuffle the default shuffle implementation for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25636) FLIP-199: Change some default config values of blocking shuffle for better usability

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25636:
---

 Summary: FLIP-199: Change some default config values of blocking 
shuffle for better usability
 Key: FLINK-25636
 URL: https://issues.apache.org/jira/browse/FLINK-25636
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


This is the umbrella issue for FLIP-199, we will change the several default 
config value for batch shuffle and update the document accordingly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25607) Sorting by duration on Flink Web UI does not work correctly

2022-01-11 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25607:
---

 Summary: Sorting by duration on Flink Web UI does not work 
correctly
 Key: FLINK-25607
 URL: https://issues.apache.org/jira/browse/FLINK-25607
 Project: Flink
  Issue Type: Bug
Reporter: Yingjie Cao
 Attachments: image-2022-01-11-16-44-10-709.png

The Flink version used is 1.14.

!image-2022-01-11-16-44-10-709.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25606) Requesting exclusive buffers timeout when recovering from unaligned checkpoint under fine-grained resource mode

2022-01-11 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25606:
---

 Summary: Requesting exclusive buffers timeout when recovering from 
unaligned checkpoint under fine-grained resource mode
 Key: FLINK-25606
 URL: https://issues.apache.org/jira/browse/FLINK-25606
 Project: Flink
  Issue Type: Bug
Reporter: Yingjie Cao


When converting the RecoveredInputChannel to RemoteInputChannel, the network 
buffer is not enough to initialize input channel exclusive buffers. Here is the 
exception stack:
{code:java}
java.io.IOException: Timeout triggered when requesting exclusive buffers: The 
total number of network buffers is currently set to 6144 of 32768 bytes each. 
You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:205)
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:60)
  at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.requestExclusiveBuffers(BufferManager.java:133)
  at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.setup(RemoteInputChannel.java:157)
  at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteRecoveredInputChannel.toInputChannelInternal(RemoteRecoveredInputChannel.java:77)
  at 
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.toInputChannel(RecoveredInputChannel.java:106)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.convertRecoveredInputChannels(SingleInputGate.java:307)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290)
  at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
  at java.lang.Thread.run(Thread.java:834) {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[VOTE] FLIP-199: Change some default config values of blocking shuffle for better usability

2022-01-10 Thread Yingjie Cao
Hi all,

I'd like to start a vote on FLIP-199: Change some default config values of
blocking shuffle for better usability [1] which has been discussed in this
thread [2].

The vote will be open for at least 72 hours unless there is an objection or
not enough votes.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
[2] https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p

Best,
Yingjie


[DISCUSS] FLIP-209: Support to run multiple shuffle plugins in one session cluster

2022-01-07 Thread Yingjie Cao
Hi dev,

I'd like to start a discussion for FLIP-209 [1] which aims to support to
run multiple shuffle plugins in one session cluster. Currently, one Flink
cluster can only use one shuffle service plugin configured by
'shuffle-service-factory.class'. It is not flexible enough and cannot
support use cases like selecting different shuffle service for different
workloads (e.g. batch vs. streaming). This feature has been mentioned for
several times [2] and FLIP-209 aims to implement it.

Please refer to FLIP-209 [1] for more details and any feedback is highly
appreciated.

Best,
Yingjie

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-209%3A+Support+to+run+multiple+shuffle+plugins+in+one+session+cluster?moved=true
[2] https://lists.apache.org/thread/k4owttq9q3cq4knoobrzc31bghf7vc0o


Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-06 Thread Yingjie Cao
Hi all,

Thanks very much for all of the feedbacks. It seems that we have reached a
consensus. I will start a vote soon.

Best,
Yingjie

Yun Gao  于2022年1月5日周三 16:08写道:

> Very thanks @Yingjie for completing the experiments!
>
> Also +1 for changing the default config values. From the experiments,
> Changing the default config values would largely increase the open box
> experience of the flink batch, thus it seems worth changing from my side
> even if it would cause some compatibility issue under some scenarios. In
> addition, if we finally have to break compatibility, we might do it early
> to
> avoid affecting more users.
>
> Best,
> Yun
>
> --
> From:刘建刚 
> Send Time:2022 Jan. 4 (Tue.) 20:43
> To:user-zh 
> Cc:dev ; user 
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Thanks for the experiment. +1 for the changes.
>
> Yingjie Cao  于2022年1月4日周二 17:35写道:
>
> > Hi all,
> >
> > After running some tests with the proposed default value (
> > taskmanager.network.sort-shuffle.min-parallelism: 1,
> > taskmanager.network.sort-shuffle.min-buffers: 512,
> > taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> > taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> > share some test results.
> >
> > 1. TPC-DS performance and stability test (I the TPC-DS benchmark using
> 512
> > default parallelism and several different settings multiple times):
> > 1) Stability:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query stability a lot. With the current default, there
> > are many queries suffering from blocking shuffle relevant failures. With
> > the proposed default values, only three queries fail because of the
> > "Insufficient number of network buffers:" error. With 512 parallelism,
> the
> > current default configuration will incur the same issue. Part of the
> reason
> > is that the network buffer consumed by InputGate is  proportional to
> > parallelism and can use 32M network memory by default and many tasks has
> > several InputGate but we only has 128M network memory per TaskManager by
> > default.
> > 2) Performance:
> > Compared to the current default values, the proposed default values can
> > improve the TPC-DS query performance a lot. Except for those queries of
> > small shuffle data amount which consume really short time, the proposed
> > default values can bring 2-10 times performance gain. About the default
> > value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> > Yun, I tested both 1 and 128 and 1 is better for performance which is as
> > expected.
> >
> > 2. Flink pre-commit stability test:
> > I have run all Flink tests with the proposed default value for more than
> > 20 times. The only instability is the "Insufficient number of network
> > buffers:" error for batch several test cases. This error occurs because
> > some tests have really limited network buffers and the proposed default
> > config values may increase the network buffer consumption for cases.
> After
> > increase the total network size for these test cases, the issue can be
> > solved.
> >
> > Summary:
> > 1. The proposed default value can improve both the performance and
> > stability of Flink batch shuffle a lot.
> > 2. Some batch jobs may fail because of the "Insufficient number of
> network
> > buffers:" error for this default value change will increase the network
> > buffer consumption a little for jobs less than 512 parallelism (for jobs
> > more than 512 parallelism network buffer consumption will be reduced).
> > 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> > better performance than setting that to 128, both settings may incur the
> > "Insufficient number of network buffers:" error.
> > 4. After changing the default value and fixing several test cases, all
> > Flink tests (except for those known unstable cases) can run stably.
> >
> > Personally, I am +1 to make the change. Though the change may cause some
> > batch jobs fail because of the "Insufficient number of network buffers:",
> > the possibility is small enough (only 3 TPC-DS out of about 100 queries
> > fails, these queries will also fail with the current default
> configuration
> > because it is the InputGate which takes the most network buffers and cost
> > the error). Compared to this small regression, the performance and
> > stability ga

Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-04 Thread Yingjie Cao
Hi all,

After running some tests with the proposed default value (
taskmanager.network.sort-shuffle.min-parallelism: 1,
taskmanager.network.sort-shuffle.min-buffers: 512,
taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
share some test results.

1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
default parallelism and several different settings multiple times):
1) Stability:
Compared to the current default values, the proposed default values can
improve the TPC-DS query stability a lot. With the current default, there
are many queries suffering from blocking shuffle relevant failures. With
the proposed default values, only three queries fail because of the
"Insufficient number of network buffers:" error. With 512 parallelism, the
current default configuration will incur the same issue. Part of the reason
is that the network buffer consumed by InputGate is  proportional to
parallelism and can use 32M network memory by default and many tasks has
several InputGate but we only has 128M network memory per TaskManager by
default.
2) Performance:
Compared to the current default values, the proposed default values can
improve the TPC-DS query performance a lot. Except for those queries of
small shuffle data amount which consume really short time, the proposed
default values can bring 2-10 times performance gain. About the default
value of taskmanager.network.sort-shuffle.min-parallelism  proposed by Yun,
I tested both 1 and 128 and 1 is better for performance which is as
expected.

2. Flink pre-commit stability test:
I have run all Flink tests with the proposed default value for more than 20
times. The only instability is the "Insufficient number of network
buffers:" error for batch several test cases. This error occurs because
some tests have really limited network buffers and the proposed default
config values may increase the network buffer consumption for cases. After
increase the total network size for these test cases, the issue can be
solved.

Summary:
1. The proposed default value can improve both the performance and
stability of Flink batch shuffle a lot.
2. Some batch jobs may fail because of the "Insufficient number of network
buffers:" error for this default value change will increase the network
buffer consumption a little for jobs less than 512 parallelism (for jobs
more than 512 parallelism network buffer consumption will be reduced).
3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has better
performance than setting that to 128, both settings may incur the
"Insufficient number of network buffers:" error.
4. After changing the default value and fixing several test cases, all
Flink tests (except for those known unstable cases) can run stably.

Personally, I am +1 to make the change. Though the change may cause some
batch jobs fail because of the "Insufficient number of network buffers:",
the possibility is small enough (only 3 TPC-DS out of about 100 queries
fails, these queries will also fail with the current default configuration
because it is the InputGate which takes the most network buffers and cost
the error). Compared to this small regression, the performance and
stability gains are big. Any feedbacks especially those from Flink batch
users are highly appreciated.

BTW, aside from the above tests, I also tries to tune some more config
options to try to make the TPC-DS test faster. I copied these tuned config
options from our daily TPC-DS settings. The results show that the optimized
configuration can improve the TPC-DS performance about 30%. Though these
settings may not the best, they really help compared to the default value.
I attached some settings in this may, I guess some Flink batch users may be
interested in this. Based on my limited knowledge, I guess that increasing
the total TaskManager size and network memory size is important for
performance, because more memory (managed and network) can make operators
and shuffle faster.

Best,
Yingjie



Yingjie Cao  于2021年12月15日周三 12:19写道:

> Hi Till,
>
> Thanks for the suggestion. I think it makes a lot of sense to also extend
> the documentation for the sort shuffle to include a tuning guide.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2021年12月14日周二 18:57写道:
>
>> As part of this FLIP, does it make sense to also extend the documentation
>> for the sort shuffle [1] to include a tuning guide? I am thinking of a more
>> in depth description of what things you might observe and how to influence
>> them with the configuration options.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>
>> Cheers,
>> Till
>>
>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li 
>> wrote:
>>
>>> Hi Yingjie,

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-14 Thread Yingjie Cao
Hi Till,

Thanks for the suggestion. I think it makes a lot of sense to also extend
the documentation for the sort shuffle to include a tuning guide.

Best,
Yingjie

Till Rohrmann  于2021年12月14日周二 18:57写道:

> As part of this FLIP, does it make sense to also extend the documentation
> for the sort shuffle [1] to include a tuning guide? I am thinking of a more
> in depth description of what things you might observe and how to influence
> them with the configuration options.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>
> Cheers,
> Till
>
> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li 
> wrote:
>
>> Hi Yingjie,
>>
>> Thanks for your explanation. I have no more questions. +1
>>
>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao 
>> wrote:
>> >
>> > Hi Jingsong,
>> >
>> > Thanks for your feedback.
>> >
>> > >>> My question is, what is the maximum parallelism a job can have with
>> the default configuration? (Does this break out of the box)
>> >
>> > Yes, you are right, these two options are related to network memory and
>> framework off-heap memory. Generally, these changes will not break out of
>> the box experience, but for some extreme cases, for example, there are too
>> many ResultPartitions per task, users may need to increase network memory
>> to avoid "insufficient network buffer" error. For framework off-head, I
>> believe that user do not need to change the default value.
>> >
>> > In fact, I have a basic goal when changing these config values: when
>> running TPCDS of medium parallelism with the default value, all queries
>> must pass without any error and at the same time, the performance can be
>> improved. I think if we achieve this goal, most common use cases can be
>> covered.
>> >
>> > Currently, for the default configuration, the exclusive buffers
>> required at input gate side is still parallelism relevant (though since
>> 1.14, we can decouple the network buffer consumption from parallelism by
>> setting a config value, it has slight performance influence on streaming
>> jobs), which means that no large parallelism can be supported by the
>> default configuration. Roughly, I would say the default value can support
>> jobs of several hundreds of parallelism.
>> >
>> > >>> I do feel that this correspondence is a bit difficult to control at
>> the moment, and it would be best if a rough table could be provided.
>> >
>> > I think this is a good suggestion, we can provide those suggestions in
>> the document.
>> >
>> > Best,
>> > Yingjie
>> >
>> > Jingsong Li  于2021年12月14日周二 14:39写道:
>> >>
>> >> Hi  Yingjie,
>> >>
>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>> >> of batch jobs.
>> >>
>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>> >> network memory and framework.off-heap.size.
>> >>
>> >> My question is, what is the maximum parallelism a job can have with
>> >> the default configuration? (Does this break out of the box)
>> >>
>> >> How much network memory and framework.off-heap.size are required for
>> >> how much parallelism in the default configuration?
>> >>
>> >> I do feel that this correspondence is a bit difficult to control at
>> >> the moment, and it would be best if a rough table could be provided.
>> >>
>> >> Best,
>> >> Jingsong
>> >>
>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao 
>> wrote:
>> >> >
>> >> > Hi Jiangang,
>> >> >
>> >> > Thanks for your suggestion.
>> >> >
>> >> > >>> The config can affect the memory usage. Will the related memory
>> configs be changed?
>> >> >
>> >> > I think we will not change the default network memory settings. My
>> best expectation is that the default value can work for most cases (though
>> may not the best) and for other cases, user may need to tune the memory
>> settings.
>> >> >
>> >> > >>> Can you share the tpcds results for different configs? Although
>> we change the default values, it is helpful to change them for different
>> users. In t

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Yingjie Cao
Hi Jingsong,

Thanks for your feedback.

>>> My question is, what is the maximum parallelism a job can have with the
default configuration? (Does this break out of the box)

Yes, you are right, these two options are related to network memory and
framework off-heap memory. Generally, these changes will not break out of
the box experience, but for some extreme cases, for example, there are too
many ResultPartitions per task, users may need to increase network memory
to avoid "insufficient network buffer" error. For framework off-head, I
believe that user do not need to change the default value.

In fact, I have a basic goal when changing these config values: when
running TPCDS of medium parallelism with the default value, all queries
must pass without any error and at the same time, the performance can be
improved. I think if we achieve this goal, most common use cases can be
covered.

Currently, for the default configuration, the exclusive buffers required at
input gate side is still parallelism relevant (though since 1.14, we
can decouple the network buffer consumption from parallelism by setting a
config value, it has slight performance influence on streaming jobs), which
means that no large parallelism can be supported by the default
configuration. Roughly, I would say the default value can support jobs of
several hundreds of parallelism.

>>> I do feel that this correspondence is a bit difficult to control at the
moment, and it would be best if a rough table could be provided.

I think this is a good suggestion, we can provide those suggestions in the
document.

Best,
Yingjie

Jingsong Li  于2021年12月14日周二 14:39写道:

> Hi  Yingjie,
>
> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
> of batch jobs.
>
> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
> and "taskmanager.network.sort-shuffle.min-buffers" are related to
> network memory and framework.off-heap.size.
>
> My question is, what is the maximum parallelism a job can have with
> the default configuration? (Does this break out of the box)
>
> How much network memory and framework.off-heap.size are required for
> how much parallelism in the default configuration?
>
> I do feel that this correspondence is a bit difficult to control at
> the moment, and it would be best if a rough table could be provided.
>
> Best,
> Jingsong
>
> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao 
> wrote:
> >
> > Hi Jiangang,
> >
> > Thanks for your suggestion.
> >
> > >>> The config can affect the memory usage. Will the related memory
> configs be changed?
> >
> > I think we will not change the default network memory settings. My best
> expectation is that the default value can work for most cases (though may
> not the best) and for other cases, user may need to tune the memory
> settings.
> >
> > >>> Can you share the tpcds results for different configs? Although we
> change the default values, it is helpful to change them for different
> users. In this case, the experience can help a lot.
> >
> > I did not keep all previous TPCDS results, but from the results, I can
> tell that on HDD, always using the sort-shuffle is a good choice. For small
> jobs, using sort-shuffle may not bring much performance gain, this may
> because that all shuffle data can be cached in memory (page cache), this is
> the case if the cluster have enough resources. However, if the whole
> cluster is under heavy burden or you are running large scale jobs, the
> performance of those small jobs can also be influenced. For large-scale
> jobs, the configurations suggested to be tuned are
> taskmanager.network.sort-shuffle.min-buffers and
> taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
> these values for large-scale batch jobs.
> >
> > BTW, I am still running TPCDS tests these days and I can share these
> results soon.
> >
> > Best,
> > Yingjie
> >
> > 刘建刚  于2021年12月10日周五 18:30写道:
> >>
> >> Glad to see the suggestion. In our test, we found that small jobs with
> the changing configs can not improve the performance much just as your
> test. I have some suggestions:
> >>
> >> The config can affect the memory usage. Will the related memory configs
> be changed?
> >> Can you share the tpcds results for different configs? Although we
> change the default values, it is helpful to change them for different
> users. In this case, the experience can help a lot.
> >>
> >> Best,
> >> Liu Jiangang
> >>
> >> Yun Gao  于2021年12月10日周五 17:20写道:
> >>>
> >>> Hi Yingjie,
> >>>
> >>> Very thanks for draftin

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Yingjie Cao
Hi Jiangang,

Thanks for your suggestion.

>>> The config can affect the memory usage. Will the related memory configs
be changed?

I think we will not change the default network memory settings. My best
expectation is that the default value can work for most cases (though may
not the best) and for other cases, user may need to tune the memory
settings.

>>> Can you share the tpcds results for different configs? Although we
change the default values, it is helpful to change them for different
users. In this case, the experience can help a lot.

I did not keep all previous TPCDS results, but from the results, I can tell
that on HDD, always using the sort-shuffle is a good choice. For small
jobs, using sort-shuffle may not bring much performance gain, this may
because that all shuffle data can be cached in memory (page cache), this is
the case if the cluster have enough resources. However, if the whole
cluster is under heavy burden or you are running large scale jobs, the
performance of those small jobs can also be influenced. For large-scale
jobs, the configurations suggested to be tuned are
taskmanager.network.sort-shuffle.min-buffers and
taskmanager.memory.framework.off-heap.batch-shuffle.size, you can increase
these values for large-scale batch jobs.

BTW, I am still running TPCDS tests these days and I can share these
results soon.

Best,
Yingjie

刘建刚  于2021年12月10日周五 18:30写道:

> Glad to see the suggestion. In our test, we found that small jobs with the
> changing configs can not improve the performance much just as your test. I
> have some suggestions:
>
>- The config can affect the memory usage. Will the related memory
>configs be changed?
>- Can you share the tpcds results for different configs? Although we
>change the default values, it is helpful to change them for different
>users. In this case, the experience can help a lot.
>
> Best,
> Liu Jiangang
>
> Yun Gao  于2021年12月10日周五 17:20写道:
>
>> Hi Yingjie,
>>
>> Very thanks for drafting the FLIP and initiating the discussion!
>>
>> May I have a double confirmation for
>> taskmanager.network.sort-shuffle.min-parallelism that
>> since other frameworks like Spark have used sort-based shuffle for all
>> the cases, does our
>> current circumstance still have difference with them?
>>
>> Best,
>> Yun
>>
>>
>>
>>
>> --
>> From:Yingjie Cao 
>> Send Time:2021 Dec. 10 (Fri.) 16:17
>> To:dev ; user ; user-zh <
>> user...@flink.apache.org>
>> Subject:Re: [DISCUSS] Change some default config values of blocking
>> shuffle
>>
>> Hi dev & users:
>>
>> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>>
>> Best,
>> Yingjie
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>> Yingjie Cao  于2021年12月3日周五 17:02写道:
>>
>> Hi dev & users,
>>
>> We propose to change some default values of blocking shuffle to improve
>> the user out-of-box experience (not influence streaming). The default
>> values we want to change are as follows:
>>
>> 1. Data compression
>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>> default value is 'false'.  Usually, data compression can reduce both disk
>> and network IO which is good for performance. At the same time, it can save
>> storage space. We propose to change the default value to true.
>>
>> 2. Default shuffle implementation
>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>> both stability and performance. So we propose to reduce the default value
>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>> 1024 with a tpc-ds and 128 is the best one.)
>>
>> 3. Read buffer of sort-shuffle
>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>> default value is '32M'. Previously, when choosing the default value, both
>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>> way. However, recently, it is reported in the mailing list that the default
>> value is not enough which caused a buffer request timeout issue. We already
>> created a ticket to improve the behavior. At the same time, we propose to
>> increase this default value to '64M' which can also help.
>>
>> 4. Sort buffer size of sort-shuffle
>>

Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-13 Thread Yingjie Cao
Hi Yun,

Thanks for your feedback.

I think setting taskmanager.network.sort-shuffle.min-parallelism to 1 and
using sort-shuffle for all cases by default is a good suggestion. I am not
choosing this value mainly because two reasons:

1. The first one is that it increases the usage of network memory which may
cause "insufficient network buffer" exception and user may have to increase
the total network buffers.
2. There are several (not many) TPCDS queries suffers some performance
regression on SSD.

For the first issue, I will test more settings on tpcds and see the
influence. For the second issue, I will try to find the cause and solve it
in 1.15.

I am open for your suggestion, but I still need some more tests and
analysis to guarantee that it works well.

Best,
Yingjie

Yun Gao  于2021年12月10日周五 17:19写道:

> Hi Yingjie,
>
> Very thanks for drafting the FLIP and initiating the discussion!
>
> May I have a double confirmation for 
> taskmanager.network.sort-shuffle.min-parallelism
> that
> since other frameworks like Spark have used sort-based shuffle for all the
> cases, does our
> current circumstance still have difference with them?
>
> Best,
> Yun
>
>
>
> --
> From:Yingjie Cao 
> Send Time:2021 Dec. 10 (Fri.) 16:17
> To:dev ; user ; user-zh <
> user...@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Hi dev & users:
>
> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>
> Best,
> Yingjie
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
>
> Yingjie Cao  于2021年12月3日周五 17:02写道:
> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>
>
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 Thread Yingjie Cao
Hi dev & users:

I have created a FLIP [1] for it, feedbacks are highly appreciated.

Best,
Yingjie

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability

Yingjie Cao  于2021年12月3日周五 17:02写道:

> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10
> result partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-06 Thread Yingjie Cao
Hi Till,

Thanks for your feedback.

>>> How will our tests be affected by these changes? Will Flink require
more resources and, thus, will it risk destabilizing our testing
infrastructure?

There are some tests that need to be adjusted, for example,
BlockingShuffleITCase. For other tests, theoretically, the influence should
be small. I will further run all tests multiple times (like 10 or 20) to
ensure that there is no test stability issues before making the change.

>>> I would propose to create a FLIP for these changes since you propose to
change the default behaviour. It can be a very short one, though.

Yes, you are right. I will prepare a simple FLIP soon.

Best,
Yingjie


Till Rohrmann  于2021年12月3日周五 18:39写道:

> Thanks for starting this discussion Yingjie,
>
> How will our tests be affected by these changes? Will Flink require more
> resources and, thus, will it risk destabilizing our testing infrastructure?
>
> I would propose to create a FLIP for these changes since you propose to
> change the default behaviour. It can be a very short one, though.
>
> Cheers,
> Till
>
> On Fri, Dec 3, 2021 at 10:02 AM Yingjie Cao 
> wrote:
>
>> Hi dev & users,
>>
>> We propose to change some default values of blocking shuffle to improve
>> the user out-of-box experience (not influence streaming). The default
>> values we want to change are as follows:
>>
>> 1. Data compression
>> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
>> default value is 'false'.  Usually, data compression can reduce both disk
>> and network IO which is good for performance. At the same time, it can save
>> storage space. We propose to change the default value to true.
>>
>> 2. Default shuffle implementation
>> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
>> value is 'Integer.MAX', which means by default, Flink jobs will always use
>> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
>> both stability and performance. So we propose to reduce the default value
>> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
>> 1024 with a tpc-ds and 128 is the best one.)
>>
>> 3. Read buffer of sort-shuffle
>> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
>> default value is '32M'. Previously, when choosing the default value, both
>> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
>> way. However, recently, it is reported in the mailing list that the default
>> value is not enough which caused a buffer request timeout issue. We already
>> created a ticket to improve the behavior. At the same time, we propose to
>> increase this default value to '64M' which can also help.
>>
>> 4. Sort buffer size of sort-shuffle
>> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
>> value is '64' which means '64' network buffers (32k per buffer by default).
>> This default value is quite modest and the performance can be influenced.
>> We propose to increase this value to a larger one, for example, 512 (the
>> default TM and network buffer configuration can serve more than 10
>> result partitions concurrently).
>>
>> We already tested these default values together with tpc-ds benchmark in
>> a cluster and both the performance and stability improved a lot. These
>> changes can help to improve the out-of-box experience of blocking shuffle.
>> What do you think about these changes? Is there any concern? If there are
>> no objections, I will make these changes soon.
>>
>> Best,
>> Yingjie
>>
>


[DISCUSS] Change some default config values of blocking shuffle

2021-12-03 Thread Yingjie Cao
Hi dev & users,

We propose to change some default values of blocking shuffle to improve the
user out-of-box experience (not influence streaming). The default values we
want to change are as follows:

1. Data compression
(taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
default value is 'false'.  Usually, data compression can reduce both disk
and network IO which is good for performance. At the same time, it can save
storage space. We propose to change the default value to true.

2. Default shuffle implementation
(taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
value is 'Integer.MAX', which means by default, Flink jobs will always use
hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
both stability and performance. So we propose to reduce the default value
to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
1024 with a tpc-ds and 128 is the best one.)

3. Read buffer of sort-shuffle
(taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
default value is '32M'. Previously, when choosing the default value, both
‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
way. However, recently, it is reported in the mailing list that the default
value is not enough which caused a buffer request timeout issue. We already
created a ticket to improve the behavior. At the same time, we propose to
increase this default value to '64M' which can also help.

4. Sort buffer size of sort-shuffle
(taskmanager.network.sort-shuffle.min-buffers): Currently, the default
value is '64' which means '64' network buffers (32k per buffer by default).
This default value is quite modest and the performance can be influenced.
We propose to increase this value to a larger one, for example, 512 (the
default TM and network buffer configuration can serve more than 10
result partitions concurrently).

We already tested these default values together with tpc-ds benchmark in a
cluster and both the performance and stability improved a lot. These
changes can help to improve the out-of-box experience of blocking shuffle.
What do you think about these changes? Is there any concern? If there are
no objections, I will make these changes soon.

Best,
Yingjie


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-12-01 Thread Yingjie Cao
Hi Jiangang,

Great to hear that, welcome to work together to make the project better.

Best,
Yingjie

刘建刚  于2021年12月1日周三 下午3:27写道:

> Good work for flink's batch processing!
> Remote shuffle service can resolve the container lost problem and reduce
> the running time for batch jobs once failover. We have investigated the
> component a lot and welcome Flink's native solution. We will try it and
> help improve it.
>
> Thanks,
> Liu Jiangang
>
> Yingjie Cao  于2021年11月30日周二 下午9:33写道:
>
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1]
> for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and
> further
> > embrace cloud native. For more features about the project, please refer
> to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi Jing,

Great to hear that, collaborations and feedbacks are welcomed.

Best,
Yingjie

Jing Zhang  于2021年12月1日周三 上午10:34写道:

> Amazing!
> Remote shuffle service is an important improvement for batch data
> processing experience on Flink.
> It is also a strong requirement in our internal batch business, we would
> try it soon and give you feedback.
>
> Best,
> Jing Zhang
>
> Martijn Visser  于2021年12月1日周三 上午3:25写道:
>
> > Hi Yingjie,
> >
> > This is great, thanks for sharing. Will you also add it to
> > https://flink-packages.org/ ?
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 30 Nov 2021 at 17:31, Till Rohrmann 
> wrote:
> >
> > > Great news, Yingjie. Thanks a lot for sharing this information with the
> > > community and kudos to all the contributors of the external shuffle
> > service
> > > :-)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> > > wrote:
> > >
> > > > Hi dev & users,
> > > >
> > > > We are happy to announce the open source of remote shuffle project
> [1]
> > > for
> > > > Flink. The project is originated in Alibaba and the main motivation
> is
> > to
> > > > improve batch data processing for both performance & stability and
> > > further
> > > > embrace cloud native. For more features about the project, please
> refer
> > > to
> > > > [1].
> > > >
> > > > Before going open source, the project has been used widely in
> > production
> > > > and it behaves well on both stability and performance. We hope you
> > enjoy
> > > > it. Collaborations and feedbacks are highly appreciated.
> > > >
> > > > Best,
> > > > Yingjie on behalf of all contributors
> > > >
> > > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > > >
> > >
> >
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi Martijn,

Yes, we will add it to flink-packages soon.

Best,
Yingjie

Martijn Visser  于2021年12月1日周三 上午3:24写道:

> Hi Yingjie,
>
> This is great, thanks for sharing. Will you also add it to
> https://flink-packages.org/ ?
>
> Best regards,
>
> Martijn
>
> On Tue, 30 Nov 2021 at 17:31, Till Rohrmann  wrote:
>
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle
> service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> > wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1]
> > for
> > > Flink. The project is originated in Alibaba and the main motivation is
> to
> > > improve batch data processing for both performance & stability and
> > further
> > > embrace cloud native. For more features about the project, please refer
> > to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in
> production
> > > and it behaves well on both stability and performance. We hope you
> enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >
>


[ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi dev & users,

We are happy to announce the open source of remote shuffle project [1] for
Flink. The project is originated in Alibaba and the main motivation is to
improve batch data processing for both performance & stability and further
embrace cloud native. For more features about the project, please refer to
[1].

Before going open source, the project has been used widely in production
and it behaves well on both stability and performance. We hope you enjoy
it. Collaborations and feedbacks are highly appreciated.

Best,
Yingjie on behalf of all contributors

[1] https://github.com/flink-extended/flink-remote-shuffle


Re: [DISCUSS] Releasing Flink 1.14.1

2021-11-24 Thread Yingjie Cao
Hi Martijn,

I moved the fix version of "FLINK-21788
 - Throw
PartitionNotFoundException if the partition file has been lost for blocking
shuffle" to 1.15.0

Best,
Yingjie

Martijn Visser  于2021年11月25日周四 上午2:40写道:

> Hi all,
>
> I would like to start a discussion on releasing Flink 1.14.1. Flink 1.14
> was released on the 29th of September [1] and so far 107 issues have been
> resolved, including multiple blockers and critical priorities [2].
>
> There are currently 169 open tickets which contain a fixVersion for 1.14.1
> [3]. I'm including the ones that are currently marked as critical or a
> blocker to verify if these should be included in Flink 1.14.1. It would be
> great if those that are assigned or working on one or more of these tickets
> can give an update on its status.
>
> * https://issues.apache.org/jira/browse/FLINK-24543 - Zookeeper connection
> issue causes inconsistent state in Flink -> I think this depends on the
> outcome of dropping Zookeeper 3.4 as was proposed on the Dev mailing list
> * https://issues.apache.org/jira/browse/FLINK-25027 - Allow GC of a
> finished job's JobMaster before the slot timeout is reached
> * https://issues.apache.org/jira/browse/FLINK-25022 - ClassLoader leak
> with
> ThreadLocals on the JM when submitting a job through the REST API
> * https://issues.apache.org/jira/browse/FLINK-24789 -
> IllegalStateException
> with CheckpointCleaner being closed already
> * https://issues.apache.org/jira/browse/FLINK-24328 - Long term fix for
> receiving new buffer size before network reader configured -> I'm not sure
> if this would end up in Flink 1.14.1, I think it's more likely that it
> would be Flink 1.15. Anton/Dawid, could you confirm this?
> * https://issues.apache.org/jira/browse/FLINK-23946 - Application mode
> fails fatally when being shut down -> This depends on
> https://issues.apache.org/jira/browse/FLINK-24038 and I don't see much
> happening there, so I also expect that this would move to Flink 1.15.
> David, could you confirm?
> * https://issues.apache.org/jira/browse/FLINK-22113 - UniqueKey constraint
> is lost with multiple sources join in SQL
> * https://issues.apache.org/jira/browse/FLINK-21788 - Throw
> PartitionNotFoundException if the partition file has been lost for blocking
> shuffle -> I'm also expecting that this would move to Flink 1.15, can you
> confirm Yingjie ?
>
> There are quite some other tickets that I've excluded from this list,
> because they are either test instabilities or are not depending on a Flink
> release to be resolved.
>
> Note: there are quite a few test instabilities in the list and help on
> those is always appreciated. You can check all unassigned tickets
> instabilities in Jira [4].
>
> Are there any other open tickets that we should wait for? Is there a PMC
> member who would like to manage the release? I'm more than happy to help
> with monitoring the status of the tickets.
>
> Best regards,
>
> Martijn
>
> [1] https://flink.apache.org/news/2021/09/29/release-1.14.0.html
> [2]
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Resolved%2C%20Closed)%20AND%20fixVersion%20%3D%201.14.1%20ORDER%20BY%20priority%20DESC%2C%20created%20DESC
> [3]
>
> https://issues.apache.org/jira/issues?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20fixVersion%20%3D%201.14.1%20ORDER%20BY%20priority%20DESC%2C%20created%20DESC
>
> [4]
>
> https://issues.apache.org/jira/issues?jql=project%20%3D%20FLINK%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20fixVersion%20%3D%201.14.1%20AND%20labels%20%3D%20test-stability%20AND%20assignee%20in%20(EMPTY)%20ORDER%20BY%20priority%20DESC%2C%20created%20DESC
>
> Martijn Visser | Product Manager
>
> mart...@ververica.com
>
> 
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>


[jira] [Created] (FLINK-24954) Reset read buffer request timeout on buffer recycling for sort-shuffle

2021-11-18 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24954:
---

 Summary: Reset read buffer request timeout on buffer recycling for 
sort-shuffle
 Key: FLINK-24954
 URL: https://issues.apache.org/jira/browse/FLINK-24954
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, the read buffer request timeout implementation of sort-shuffle is a 
little aggressive. As reported in the mailing list: 
[https://lists.apache.org/thread/bd3s5bqfg9oxlb1g1gg3pxs3577lhf88]. The 
TimeoutException may be triggered if there is data skew and the downstream task 
is slow. Actually, we can further improve this case by reseting the request 
timeout on buffer recycling.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24901) Some further improvements of the pluggable shuffle framework

2021-11-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24901:
---

 Summary: Some further improvements of the pluggable shuffle 
framework
 Key: FLINK-24901
 URL: https://issues.apache.org/jira/browse/FLINK-24901
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


This is an umbrella issue including several further improvements of the 
pluggable shuffle framework.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24900) Support to run multiple shuffle plugins in one session cluster

2021-11-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24900:
---

 Summary: Support to run multiple shuffle plugins in one session 
cluster
 Key: FLINK-24900
 URL: https://issues.apache.org/jira/browse/FLINK-24900
 Project: Flink
  Issue Type: Improvement
Reporter: Yingjie Cao


Currently, one Flink cluster can only use one shuffle plugin. However, there 
are cases where different jobs may need different shuffle implementations. By 
loading shuffle plugin with the plugin manager and letting jobs select their 
shuffle service freely, Flink can support to run multiple shuffle plugins in 
one session cluster.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24899) Enable data compression for blocking shuffle by default

2021-11-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24899:
---

 Summary: Enable data compression for blocking shuffle by default
 Key: FLINK-24899
 URL: https://issues.apache.org/jira/browse/FLINK-24899
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, shuffle data compression is not enabled by default. Shuffle data 
compression is important for blocking data shuffle and enabling shuffle data 
compression by default can improve the usability.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24898) Some further improvements of sort-shuffle

2021-11-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24898:
---

 Summary: Some further improvements of sort-shuffle
 Key: FLINK-24898
 URL: https://issues.apache.org/jira/browse/FLINK-24898
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


This is an umbrella issue including several further improvements of 
sort-shuffle.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24035) Notify the buffer listeners when the local buffer pool receives available notification from the global pool

2021-08-27 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24035:
---

 Summary: Notify the buffer listeners when the local buffer pool 
receives available notification from the global pool
 Key: FLINK-24035
 URL: https://issues.apache.org/jira/browse/FLINK-24035
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yingjie Cao
 Fix For: 1.14.0


The buffer listeners are not notified when the the local buffer pool receives 
available notification from the global pool. This may cause potential deadlock 
issue:
 # A LocalBufferPool is created, but there is no available buffers in the 
global NetworkBufferPool.
 # The LocalBufferPool registers an available buffer listener to the global 
NetworkBufferPool.
 # The BufferManager requests buffers from the LocalBufferPool but no buffer is 
available. As a result, it registers an available buffer listener to the 
LocalBufferPool.
 # A buffer is recycled to the global pool and the local buffer pool is 
notified about the available buffer.
 # The local buffer pool requests the available buffer from the global pool but 
the registered available buffer listener of BufferManager is not notified and 
it can never get a chance to be notified so deadlock occurs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23724) Network buffer leak when ResultPartition is released (failover)

2021-08-11 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23724:
---

 Summary: Network buffer leak when ResultPartition is released 
(failover)
 Key: FLINK-23724
 URL: https://issues.apache.org/jira/browse/FLINK-23724
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yingjie Cao
 Fix For: 1.14.0


The BufferBuilders in BufferWritingResultPartition are not properly released 
when ResultPartition is released.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[VOTE] FLIP-184: Refine ShuffleMaster lifecycle management for pluggable shuffle service framework

2021-07-16 Thread Yingjie Cao
Hi all,

I'd like to start a vote on FLIP-184 [1] which was
discussed in [2] [3]. The vote will be open for at least 72 hours
until 7.21 unless there is an objection.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-184%3A+Refine+ShuffleMaster+lifecycle+management+for+pluggable+shuffle+service+framework
[2]
https://lists.apache.org/thread.html/radbbabfcfb6bec305ddf7aeefb983232f96b18ba013f0ae2ee500288%40%3Cdev.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/r93e3a72506f3e7ffd3c1ab860b5d1a21f8a47b059f2f2fdd05ca1d46%40%3Cdev.flink.apache.org%3E


Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-13 Thread Yingjie Cao
Hi,

Thanks for driving this, I think it is really helpful for jobs suffering
from backpressure.

Best,
Yingjie

Anton,Kalashnikov  于2021年7月9日周五 下午10:59写道:

> Hey!
>
> There is a wish to decrease amount of in-flight data which can improve
> aligned checkpoint time(fewer in-flight data to process before
> checkpoint can complete) and improve the behaviour and performance of
> unaligned checkpoints (fewer in-flight data that needs to be persisted
> in every unaligned checkpoint). The main idea is not to keep as much
> in-flight data as much memory we have but keeping the amount of data
> which can be predictably handling for configured amount of time(ex. we
> keep data which can be processed in 1 sec). It can be achieved by
> calculation of the effective throughput and following changes the buffer
> size based on the this throughput. More details about the proposal you
> can find here [1].
>
> What are you thoughts about it?
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
>
>
> --
> Best regards,
> Anton Kalashnikov
>
>
>


[DISCUSS] FLIP-184: Refine ShuffleMaster lifecycle management for pluggable shuffle service framework

2021-07-13 Thread Yingjie Cao
Hi devs and users,

This topic was originally discussed and reached a consensus in [1]. Because
the change touches the pluggable shuffle interface, though not annotated as
public currently, some users may be using it already. To avoid bring
compatibility issues to customized shuffle plugins already implemented. I
am writing a FLIP [2] which contains only the minimal changes and involving
the user mailing list. If there is no objections, I will start a voting
soon.

[1]
https://lists.apache.org/thread.html/radbbabfcfb6bec305ddf7aeefb983232f96b18ba013f0ae2ee500288%40%3Cdev.flink.apache.org%3E
.
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-184%3A+Refine+ShuffleMaster+lifecycle+management+for+pluggable+shuffle+service+framework
.

Best,
Yingjie


Re: [DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

2021-07-09 Thread Yingjie Cao
Hi Zhu,

Thanks for the reply.

> One question which might be out of the scope is that whether we should do
similar things for ShuffleEnvironment?

I agree we should also consider fatal error handling for ShuffleEnvironment
eventually.

> I think the proposal does not conflict with this target. One idea in my
mind is to maintain multiple different ShuffleServices in the
Dispatcher/JobManagerSharedServices and let them be shared between
different jobs. Each job should be configured with a key which
points to a ShuffleService. The key should be used by both the scheduler
and tasks on task managers to select their respective
ShuffleMaster/ShuffleEnvironment. This will need work both on the master
and on the worker. Currently the worker will launch a ShuffleEnvironment
shared between different tasks which can be from different jobs. But only
one single ShuffleEnvironment will be created on each task manager.

Yes, exactly. We need to do more to support that and the proposal mentioned
in you comment makes a lot of sense.

Best,
Yingjie

Zhu Zhu  于2021年7月9日周五 上午11:53写道:

> Thanks for starting this discussion.
> Here are some of my thoughts regarding the proposal and discussions
> above.
>
> *+1 to enable ShuffleMaster to stop track partitions proactively*
> In production we have encountered problems that it needs *hours* to
> recover from a remote shuffle worker lost problem. Because the lost
> finished partitions cannot be detected and reproduced all at once.
> This improvement can help to solve this problem.
>
>
> *+1 to make ShuffleMaster a cluster level component*
> This helps to avoid maintain multiple clients and connections to the
> same remote shuffle service. It also makes it possible to support
> external cluster partitions in the future.
>
> *+1 to enable ShuffleMaster to notify master about its internal *
> *non-recoverable error*
> The scheduler can keep failing or hang due to ShuffleMaster internal
> non-recoverable error. Currently this kind of problem cannot be
> auto-recovered and are hard to diagnose.
> One question which might be out of the scope is that whether we
> should do similar things for ShuffleEnvironment?
>
>
> *+1 that the abstraction should be able to support different jobs to*
> *use different ShuffleServices eventually*
> I think the proposal does not conflict with this target.
> One idea in my mind is to maintain multiple different ShuffleServices
> in the Dispatcher/JobManagerSharedServices and let them be shared
> between different jobs. Each job should be configured with a key which
> points to a ShuffleService. The key should be used by both the scheduler
> and tasks on task managers to select their respective
> ShuffleMaster/ShuffleEnvironment. This will need work both on the master
> and on the worker. Currently the worker will launch a ShuffleEnvironment
> shared between different tasks which can be from different jobs. But only
> one single ShuffleEnvironment will be created on each task manager.
>
> Thanks,
> Zhu
>
> Yingjie Cao  于2021年7月8日周四 上午11:43写道:
>
> > Hi,
> >
> > Thanks for the reply.
> >
> > @Guowei
> > I agree that we can move forward step by step and start from the most
> > important part. Apart from the two points mentioned in your reply,
> > initializing and shutting down some external resources gracefully is also
> > important which is a reason for the open/close method.
> > About the cluster partitions and the ShuffleMasterContext, I agree that
> we
> > can postpone handling the cluster partitions because we need to do more
> to
> > support it, for ShuffleMasterContext, I think we still need it even we do
> > not support the cluster partitions in the first step. Currently, the
> > shuffle master can only access the cluster configuration, except that, I
> > think we also need need the ability of handling the fatal errors
> occurring
> > in the  ShuffleMaster gracefully by propagate the errors to the
> framework.
> > By introducing the ShuffleMasterContext, we can give ShuffleMaster the
> > ability to access both the  cluster configuration and the fatal error
> > handler. Instead of passing these components directly to the
> ShuffleMaster,
> > a ShuffleMasterContext interface can keep compatibility easily in the
> > future. Even we add some new method in the future, we can offer default
> > empty implementation in the interface which can keep compatibility.
> > About the JobShuffleContext::getConfiguration/listPartitions methods, I
> > agree that we can remove them in the first step and we can add them back
> > latter. As mentioned above, we can easily keep compatibility based on the
> > Context interface.
> >
> > @Till
> > I totally 

Re: [DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

2021-07-07 Thread Yingjie Cao
Hi,

Thanks for the reply.

@Guowei
I agree that we can move forward step by step and start from the most
important part. Apart from the two points mentioned in your reply,
initializing and shutting down some external resources gracefully is also
important which is a reason for the open/close method.
About the cluster partitions and the ShuffleMasterContext, I agree that we
can postpone handling the cluster partitions because we need to do more to
support it, for ShuffleMasterContext, I think we still need it even we do
not support the cluster partitions in the first step. Currently, the
shuffle master can only access the cluster configuration, except that, I
think we also need need the ability of handling the fatal errors occurring
in the  ShuffleMaster gracefully by propagate the errors to the framework.
By introducing the ShuffleMasterContext, we can give ShuffleMaster the
ability to access both the  cluster configuration and the fatal error
handler. Instead of passing these components directly to the ShuffleMaster,
a ShuffleMasterContext interface can keep compatibility easily in the
future. Even we add some new method in the future, we can offer default
empty implementation in the interface which can keep compatibility.
About the JobShuffleContext::getConfiguration/listPartitions methods, I
agree that we can remove them in the first step and we can add them back
latter. As mentioned above, we can easily keep compatibility based on the
Context interface.

@Till
I totally agree that we should support that different jobs use different
shuffle services and the proposed solution will support this use case
eventually.

Best,
Yingjie

Till Rohrmann  于2021年7月7日周三 下午8:15写道:

> One quick comment: When developing the ShuffleService abstraction we also
> thought that different jobs might want to use different ShuffleServices
> depending on their workload (e.g. batch vs. streaming workload). So
> ideally, the chosen solution here can also support this use case
> eventually.
>
> Cheers,
> Till
>
> On Wed, Jul 7, 2021 at 12:50 PM Guowei Ma  wrote:
>
> > Hi,
> > Thank Yingjie for initiating this discussion. What I understand that the
> > document[1] actually mainly discusses two issues:
> > 1. ShuffleMaster should be at the cluster level instead of the job level
> > 2. ShuffleMaster should notify PartitionTracker that some data has been
> > lost
> >
> > Relatively speaking, I think the second problem is more serious. Because
> > for external or remote batch shuffling services, after the machine
> storing
> > shuffled data goes offline, PartitionTracker needs to be notified in time
> > to avoid repeated failures of the job. Therefore, it is hoped that when
> > shuffle data goes offline due to a machine error, ShuffleMaster can
> notify
> > the PartitionTracker in time. This requires ShuffleMaster to notify the
> > PartitionTracker with a handle such as JobShuffleContext.
> >
> > So how to pass JobShuffleContext to ShuffleMaster? There are two options:
> > 1. After ShuffleMaster is created, pass JobShuffleContext to
> ShuffleMaster,
> > such as ShuffleMaster::register(JobShuffleContext)
> > 2. Pass ShuffleServiceFactory when creating ShuffleMaster, such as
> > ShuffleServiceFatroctry.createcreateShuffleMaster(JobShuffleContext).
> >
> > Which one to choose is actually related to issue 1. Because if
> > ShuffleMaster is a cluster level, you should choose option 1, otherwise,
> > choose option 2. I think ShuffleMaster should be at the cluster level,
> for
> > example, because we don't need to maintain a ShuffleMaster for each job
> in
> > a SessionCluster; in addition, this ShuffleMaster should also be used by
> > RM's PartitionTracker in the future. Therefore, I think Option 1 is more
> > appropriate.
> >
> > To sum up, we may give priority to solving problem 2, while taking into
> > account that ShuffleMaster should be a cluster-level component.
> Therefore,
> > I think we could ignore the date ShuffleMasterContext at the beginning;
> at
> > the same time, JobShuffleContext::getConfiguration/listPartitions should
> > not be needed.
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit
> >
> > Best,
> > Guowei
> >
> >
> > On Fri, Jun 11, 2021 at 4:15 PM Yingjie Cao 
> > wrote:
> >
> > > Hi devs,
> > >
> > > I'd like to start a discussion about "Lifecycle of ShuffleMaster and
> its
> > > Relationship with JobMaster and PartitionTracker". (These are things we
> > > found when moving our external shuffle to the pluggable shuffle service
> > > framework.)
> &g

[jira] [Created] (FLINK-23275) Support to release cluster partitions stored externally

2021-07-06 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23275:
---

 Summary: Support to release cluster partitions stored externally
 Key: FLINK-23275
 URL: https://issues.apache.org/jira/browse/FLINK-23275
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.14.0


Currently, if the cluster partition is stored externally, it can not be 
released by the partition tracker, one of reason is that the ShuffleMaster is 
not a cluster level service, after FLINK-23214,we can release the cluster 
partition by ShuffleMaster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23249) Introduce ShuffleMasterContext to ShuffleMaster

2021-07-05 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23249:
---

 Summary: Introduce ShuffleMasterContext to ShuffleMaster
 Key: FLINK-23249
 URL: https://issues.apache.org/jira/browse/FLINK-23249
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.14.0


Introduce ShuffleMasterContext to ShuffleMaster. Just like the 
ShuffleEnvironmentContext at the TaskManager side, the ShuffleMasterContext can 
act as a proxy of ShuffleMaster and other components of Flink like the 
ResourceManagerPartitionTracker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-07-02 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23214:
---

 Summary: Make ShuffleMaster a cluster level shared service
 Key: FLINK-23214
 URL: https://issues.apache.org/jira/browse/FLINK-23214
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yingjie Cao


This ticket tries to make ShuffleMaster a cluster level shared service which 
makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Lifecycle of ShuffleMaster and its Relationship with JobMaster and PartitionTracker

2021-06-11 Thread Yingjie Cao
Hi devs,

I'd like to start a discussion about "Lifecycle of ShuffleMaster and its
Relationship with JobMaster and PartitionTracker". (These are things we
found when moving our external shuffle to the pluggable shuffle service
framework.)

The mail client may fail to display the right format. If so, please refer
to this document:
https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing
.
Lifecycle of ShuffleMaster

Currently, the lifecycle of ShuffleMaster seems unclear.  The
ShuffleServiceFactory is loaded for each JobMaster instance and then
ShuffleServiceFactory#createShuffleMaster will be called to create a
ShuffleMaser instance. However, the default NettyShuffleServiceFactory
always returns the same ShuffleMaser singleton instance for all jobs. Based
on the current implementation, the lifecycle of ShuffleMaster seems open
and depends on the shuffle plugin themselves. However, at the TM side,
the ShuffleEnvironment
is a part of the TaskManagerServices whose lifecycle is decoupled with jobs
which is more like a service. It means there is also an inconsistency
between the TM side and the JM side.

>From my understanding, the reason for this is that the pluggable shuffle
framework is still not completely finished yet, for example, there is a
follow up umbrella ticket  FLINK-19551
 for the pluggable
shuffle service framework and in its subtasks, there is one task (
FLINK-12731 ) which aims
to load shuffle plugin with the PluginManager. I think this can solve the
issue mentioned above. After the corresponding factory  loaded by the
PluginManager, all ShuffleMaster instances can be stored in a map indexed
by the corresponding factory class name  which can be shared by all jobs.
After that, the ShuffleMaster becomes a cluster level service which is
consistent with the ShuffleEnvironment at the TM side.

As a summary, we propose to finish FLINK-12731
 and make the shuffle
service a real cluster level service first. Furthermore, we add two
lifecycle methods to the ShuffleMaster interface, including start and
close responding
for initialization (for example, contacting the external system) and
graceful shutdown (for example, releasing the resources) respectively
(these methods already exist in the ShuffleEnvironment interface at the TM
side). What do you think?
Relationship of ShuffleMaster & JobMaster

Currently, JobMaster holds the reference to the corresponding ShuffleMaster
and it can register partitions (allocate ShuffleDescriptor from) to
ShuffleMaster
by the registerPartitionWithProducer method. To support use cases like
allocating external resources when a job starts and releasing all allocated
resources when a job terminates, we may also need some job level
initialization and finalization. These job level initialization and
finalization are also helpful when serving multiple jobs simultaneously.

As a summary,  we propose to add two job level lifecycle methods registerJob
and unregisterJob responding for job level shuffle initialization and
finalization, for example, releasing all external resources occupied by the
corresponding job. What do you think?
Relationship of ShuffleMaster & PartitionTracker

Currently, the JobMasterPartitionTracker can release external result
partitions through the releasePartitionExternally method of ShuffleMaster.
However, the shuffle plugin (ShuffleMaster) may also need the ability of
stopping  tracking some partitions depending on the status of the external
services, for example, if the external storage node which stores some
partitions crashes, we need to stop tracking all partitions in it to avoid
reproducing the lost partitions one by one. By introducing something like
ShuffleContext which delegates to the partition tracker, this requirement
can be easily satisfied. Besides, for cluster partitions, we also need to
have the ability to release them.

As a summary, we propose to add a releaseDataSetExternally method to
the ShuffleMaster
interface which is responsible for releasing cluster partitions. Besides,
we propose to add a ShuffleContext which can delegate to the
PartitionTracker and stop tracking partitions. For the cluster partitions
and job partitions, two separated ShuffleContext abstracts are needed.
What do you think?
Interface Change Summary

As discussed in the above sections, we propose to make some interface
changes around the ShuffleMaster interface. The first change is to
pass a ShuffleMasterContex
instance to the ShuffleServiceFactory when creating the ShuffleMaster just
like the ShuffleEnvironment creation at the TM side. Changes are marked
with bold texts (the same below).

public interface ShuffleServiceFactory<
SD extends ShuffleDescriptor, P extends ResultPartitionWriter, G
extends IndexedInputGate> {

/**
* Factory method to create a specific 

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Yingjie Cao
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann  于2021年6月8日周二 下午5:30写道:

> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details.  FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >>current hash-based blocking shuffle implementation writes too many
>> files
>> >>concurrently which gives high pressure to the file system, for
>> example,
>> >>maintenance of too many file metas, exhaustion of inodes or file
>> >>descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >>based blocking shuffle don’t have the problem because for one result
>> >>partition, only one file is written at the same time.
>> >>2. *Performance*: Large amounts of small shuffle files and random IO
>> >>can influence shuffle performance a lot especially for hdd (for ssd,
>> >>sequential read is also important because of read ahead and cache).
>> For
>> >>batch jobs processing massive data, small amount of data per
>> subpartition
>> >>is common because of high parallelism. Besides, data skew is
>> another cause
>> >>of small subpartition files. By merging data of all subpartitions
>> together
>> >>in one file, more sequential read can be achieved.
>> >>3. *Resource*: For current hash-based implementation, each
>> >>subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >>memory consumption can be huge. For example, we need at least 320M
>> network
>> >>memory per result partition if parallelism is set to 1 and
>> because of
>> >>the huge network consumption, it is hard to config the network
>> memory for
>> >>large scale batch job and  sometimes parallelism can not be
>> increased just
>> >>because of insufficient network memory  which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>


[jira] [Created] (FLINK-22910) ShuffleMaster enhancement for pluggable shuffle service framework

2021-06-07 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22910:
---

 Summary: ShuffleMaster enhancement for pluggable shuffle service 
framework
 Key: FLINK-22910
 URL: https://issues.apache.org/jira/browse/FLINK-22910
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Yingjie Cao
 Fix For: 1.14.0


The current _ShuffleMaster_ has an unclear lifecycle which is inconsistent with 
the _ShuffleEnvironment_ at the _TM_ side. Besides, it is hard to Implement 
some important capabilities for remote shuffle service. For example, 1) release 
external resources when a job finished; 2) Stop or start tracking some 
partitions depending on the status of the external service or system.

We drafted a document[1] which proposed some simple changes to solve these 
issues. The document is still not wholly completed yet. We will start a 
discussion once it is finished.

 

[1] 
https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Yingjie Cao
Hi devs & users,

The FLIP-148[1] has been released with Flink 1.13 and the final
implementation has some differences compared with the initial proposal in
the FLIP document. To avoid potential misunderstandings, I have updated the
FLIP document[1] accordingly and I also drafted another document[2] which
contains more implementation details.  FYI.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
[2]
https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing

Best,
Yingjie

Yingjie Cao  于2020年10月15日周四 上午11:02写道:

> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
>1. *Stability*: For high parallelism (tens of thousands) batch job,
>current hash-based blocking shuffle implementation writes too many files
>concurrently which gives high pressure to the file system, for example,
>maintenance of too many file metas, exhaustion of inodes or file
>descriptors. All of these can be potential stability issues. Sort-Merge
>based blocking shuffle don’t have the problem because for one result
>partition, only one file is written at the same time.
>2. *Performance*: Large amounts of small shuffle files and random IO
>can influence shuffle performance a lot especially for hdd (for ssd,
>sequential read is also important because of read ahead and cache). For
>batch jobs processing massive data, small amount of data per subpartition
>is common because of high parallelism. Besides, data skew is another cause
>of small subpartition files. By merging data of all subpartitions together
>in one file, more sequential read can be achieved.
>3. *Resource*: For current hash-based implementation, each
>subpartition needs at least one buffer. For large scale batch shuffles, the
>memory consumption can be huge. For example, we need at least 320M network
>memory per result partition if parallelism is set to 1 and because of
>the huge network consumption, it is hard to config the network memory for
>large scale batch job and  sometimes parallelism can not be increased just
>because of insufficient network memory  which leads to bad user experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>


[jira] [Created] (FLINK-22307) Increase the data writing cache size of sort-merge blocking shuffle

2021-04-16 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22307:
---

 Summary: Increase the data writing cache size of sort-merge 
blocking shuffle
 Key: FLINK-22307
 URL: https://issues.apache.org/jira/browse/FLINK-22307
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, the data writing cache is 8M, which is not enough if data 
compression is enabled. By increasing the cache size to 16M, the performance of 
our benchmark job can be increased by about 20%. (We may make it configurable 
in the future)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22305) Increase the default value of taskmanager.network.sort-shuffle.min-buffers

2021-04-16 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22305:
---

 Summary: Increase the default value of 
taskmanager.network.sort-shuffle.min-buffers
 Key: FLINK-22305
 URL: https://issues.apache.org/jira/browse/FLINK-22305
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, the default value of taskmanager.network.sort-shuffle.min-buffers is 
64, which is pretty small. As suggested, we'd like to increase the default 
value of taskmanager.network.sort-shuffle.min-buffers. By increasing the 
default taskmanager.network.sort-shuffle.min-buffers, the corner case of very 
small in-memory sort-buffer and write-buffer can be avoid, which is better for 
performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22156) HiveDialectQueryITCase fails on Azure because of no output for 900 seconds

2021-04-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22156:
---

 Summary: HiveDialectQueryITCase fails on Azure because of no 
output for 900 seconds
 Key: FLINK-22156
 URL: https://issues.apache.org/jira/browse/FLINK-22156
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime, Tests
Reporter: Yingjie Cao
 Fix For: 1.13.0


[https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/16105/logs/139]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22153) Manually test the sort-merge blocking shuffle

2021-04-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22153:
---

 Summary: Manually test the sort-merge blocking shuffle
 Key: FLINK-22153
 URL: https://issues.apache.org/jira/browse/FLINK-22153
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


In 1.12, we introduced sort-merge blocking shuffle to Flink and in 1.13, the 
feature was optimized which improves the usability (fix direct memory OOM 
issue) and performance (introduce IO scheduling and broadcast optimization).

The sort-merge blocking shuffle can be tested following the bellow process:
 # Write a simple batch job using either sql/table or DataStream API; (Word 
count should be enough)
 # Enable sort-merge blocking shuffle by setting 
taskmanager.network.sort-shuffle.min-parallelism to 1 in the Flink 
configuration file;
 # Submit and run the batch job with different parallelism and data volume;
 # Tune the relevant config options 
(taskmanager.network.blocking-shuffle.compression.enabled, 
taskmanager.network.sort-shuffle.min-buffers, 
taskmanager.memory.framework.off-heap.batch-shuffle.size) and see the 
influence. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22127) Enrich error message of read buffer request timeout exception

2021-04-06 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22127:
---

 Summary: Enrich error message of read buffer request timeout 
exception
 Key: FLINK-22127
 URL: https://issues.apache.org/jira/browse/FLINK-22127
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


Enrich error message of read buffer request timeout exception to tell the user 
how to solve the timeout exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21951) Fix wrong if condition in BufferReaderWriterUtil#writeBuffers

2021-03-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21951:
---

 Summary: Fix wrong if condition in 
BufferReaderWriterUtil#writeBuffers
 Key: FLINK-21951
 URL: https://issues.apache.org/jira/browse/FLINK-21951
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


The wrong if condition in BufferReaderWriterUtil#writeBuffers may lead to data 
loss when bulk writing a large amount of data into file. This is a bug intruded 
since 1.9, but only small amount of data is written, so the bug never occurs. 
In 1.13, the sort-merge shuffle uses it to write more data which triggers the 
bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21859) Batch job fails due to "Could not mark slot 61a637e3977c58a0e6b73533c419297d active"

2021-03-18 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21859:
---

 Summary: Batch job fails due to "Could not mark slot 
61a637e3977c58a0e6b73533c419297d active"
 Key: FLINK-21859
 URL: https://issues.apache.org/jira/browse/FLINK-21859
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.13.0
Reporter: Yingjie Cao


Here is the error stack:
{code:java}
2021-03-18 19:05:31org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategyat 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:130)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:81)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:221)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:701)
at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1449)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1105)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1045)
at 
org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:754)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
at 
org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669)
at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997)  
  at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) 
   at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$releaseSlot$1(DefaultDeclarativeSlotPool.java:376)
at java.util.Optional.ifPresent(Optional.java:159)at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlot(DefaultDeclarativeSlotPool.java:374)
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.failAllocation(DeclarativeSlotPoolService.java:198)
at 
org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:650)
at 
org.apache.flink.runtime.jobmaster.JobMaster.failSlot(JobMaster.java:636)at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517)at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at 
akka.actor.ActorCell.invoke(ActorCell.scala:561)at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at 
akka.dispatch.Mailbox.run(Mailbox

[jira] [Created] (FLINK-21857) StackOverflow for large parallelism jobs when processing EndOfChannelStateEvent

2021-03-18 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21857:
---

 Summary: StackOverflow for large parallelism jobs when processing 
EndOfChannelStateEvent
 Key: FLINK-21857
 URL: https://issues.apache.org/jira/browse/FLINK-21857
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


CheckpointedInputGate#handleEvent calls pollNext recursively when processing 
EndOfChannelStateEvent, for large parallelism job of large amount of input 
channels, the stack can become really deep thus causing StackOverflow. The 
following is the stack:
{code:java}
12:11
java.lang.StackOverflowError
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:650)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:625)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:611)
  at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:149)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21850) Improve document and config description of sort-merge blocking shuffle

2021-03-17 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21850:
---

 Summary: Improve document and config description of sort-merge 
blocking shuffle
 Key: FLINK-21850
 URL: https://issues.apache.org/jira/browse/FLINK-21850
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Yingjie Cao
 Fix For: 1.13.0


After the improvement of FLINK-19614, some of the previous document description 
for sort-merge blocking shuffle is not accurate, we need to improve the 
corresponding document.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21790) Shuffle data directories to make data directory section of different TaskManagers fairer

2021-03-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21790:
---

 Summary: Shuffle data directories to make data directory section 
of different TaskManagers fairer
 Key: FLINK-21790
 URL: https://issues.apache.org/jira/browse/FLINK-21790
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, different TaskManagers select data directory in the same order and 
if there are multiple disk, some disks may stores more data than others which 
is bad for performance. A simple improvement is that each TaskManager shuffles 
the given data directories randomly and select the data directory in different 
order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21789) Make FileChannelManagerImpl#getNextPathNum select data directory fairly

2021-03-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21789:
---

 Summary: Make FileChannelManagerImpl#getNextPathNum select data 
directory fairly
 Key: FLINK-21789
 URL: https://issues.apache.org/jira/browse/FLINK-21789
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


The get and increase next path index in FileChannelManagerImpl#getNextPathNum 
is not atomic which may cause unfairness of data directory selection (bad for 
performance if multiple disk is configured).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle

2021-03-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21788:
---

 Summary: Throw PartitionNotFoundException if the partition file 
has been lost for blocking shuffle
 Key: FLINK-21788
 URL: https://issues.apache.org/jira/browse/FLINK-21788
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, if the partition file has been lost for blocking shuffle, 
FileNotFoundException will be thrown and the partition data is not regenerated, 
so failover can not recover the job. It should throw PartitionNotFoundException 
instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21778) Use heap memory instead of direct memory as index entry cache for sort-merge shuffle

2021-03-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21778:
---

 Summary: Use heap memory instead of direct memory as index entry 
cache for sort-merge shuffle
 Key: FLINK-21778
 URL: https://issues.apache.org/jira/browse/FLINK-21778
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, the sort-merge shuffle implementation uses a piece of direct memory 
as index entry cache for acceleration. We can use heap memory instead to reduce 
the usage of direct memory which further reduces the possibility of 
OutOfMemoryError.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21777) Replace the 4M data writing cache of sort-merge shuffle with writev system call

2021-03-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21777:
---

 Summary: Replace the 4M data writing cache of sort-merge shuffle 
with writev system call
 Key: FLINK-21777
 URL: https://issues.apache.org/jira/browse/FLINK-21777
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, the sort-merge shuffle implementation uses 4M unmanned direct memory 
as cache for data written. It can be replaced by the writev system call which 
can reduce the unmanned direct memory usage without any performance loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20758) Use region file mechanism for shuffle data reading before we switch to managed memory

2020-12-23 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20758:
---

 Summary: Use region file mechanism for shuffle data reading before 
we switch to managed memory
 Key: FLINK-20758
 URL: https://issues.apache.org/jira/browse/FLINK-20758
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.2


FLINK-15981 implemented region file based data reader to solve the direct 
memory OOM issue introduced by usage of unmanaged direct memory, however only 
for BoundedBlockingResultPartition. We can introduce it to sort-merge based 
blocking shuffle to avoid the similar direct memory OOM problem which can 
improve the usability a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20757) Optimize data broadcast for sort-merge shuffle

2020-12-23 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20757:
---

 Summary: Optimize data broadcast for sort-merge shuffle
 Key: FLINK-20757
 URL: https://issues.apache.org/jira/browse/FLINK-20757
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


For data broadcast, we can only copy the record once when writing data into 
SortBuffer. Besides, we can write only one copy of data when spilling data into 
disk. These optimizations can improve the performance of data broadcast.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20740) Use managed memory to avoid direct memory OOM error

2020-12-22 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20740:
---

 Summary: Use managed memory to avoid direct memory OOM error
 Key: FLINK-20740
 URL: https://issues.apache.org/jira/browse/FLINK-20740
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, sort-merge blocking shuffle uses some unmanaged memory for data 
writing and reading, which means users must increase the size of direct memory, 
otherwise, one may encounter direct memory OOM error, which is really bad for 
usability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20013) BoundedBlockingSubpartition may leak network buffer if task is failed or canceled

2020-11-05 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20013:
---

 Summary: BoundedBlockingSubpartition may leak network buffer if 
task is failed or canceled
 Key: FLINK-20013
 URL: https://issues.apache.org/jira/browse/FLINK-20013
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


BoundedBlockingSubpartition may leak network buffer if task is failed or 
canceled. We need to recycle the current BufferConsumer when task is failed or 
canceled.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20010) SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on Azure Pipeline

2020-11-05 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20010:
---

 Summary: 
SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on 
Azure Pipeline
 Key: FLINK-20010
 URL: https://issues.apache.org/jira/browse/FLINK-20010
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Yingjie Cao
 Fix For: 1.12.0


SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on 
Azure Pipeline
{code:java}
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19991) UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel fails on Azure Pipeline

2020-11-05 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19991:
---

 Summary: 
UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel 
fails on Azure Pipeline
 Key: FLINK-19991
 URL: https://issues.apache.org/jira/browse/FLINK-19991
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel 
fails on Azure Pipeline
{code:java}
java.lang.AssertionError: 

Expected: <0L>
 but: was <1809L>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.junit.Assert.assertThat(Assert.java:956)
at org.junit.rules.ErrorCollector$1.call(ErrorCollector.java:65)
at org.junit.rules.ErrorCollector.checkSucceeds(ErrorCollector.java:78)
at org.junit.rules.ErrorCollector.checkThat(ErrorCollector.java:63)
at org.junit.rules.ErrorCollector.checkThat(ErrorCollector.java:54)
at 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:189)
at 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel(UnalignedCheckpointITCase.java:179)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19938) Implement shuffle data read scheduling for sort-merge blocking shuffle

2020-11-02 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19938:
---

 Summary: Implement shuffle data read scheduling for sort-merge 
blocking shuffle
 Key: FLINK-19938
 URL: https://issues.apache.org/jira/browse/FLINK-19938
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


As described in 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink.]
 shuffle IO scheduling is important for performance. We'd like to Introduce it 
to sort-merge shuffle first.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[VOTE] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-25 Thread Yingjie Cao
Hi devs,

I'd like to start a vote for FLIP-148: Introduce Sort-Merge Based Blocking
Shuffle to Flink [1] which is discussed in discussion thread [2].

The vote will last for at least 72 hours until a consensus voting.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
[2]
https://lists.apache.org/thread.html/r11750db945277d944f408eaebbbdc9d595d587fcfb67b015c716404e%40%3Cdev.flink.apache.org%3E


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-23 Thread Yingjie Cao
Hi devs,

This discussion thread has been opened for over a week. If there is no
other concerns, I'd like to open a voting thread soon.

Best,
Yingjie

Yingjie Cao  于2020年10月23日周五 上午11:56写道:

> Hi Zhijiang,
>
> Thanks for your reply and suggestions.
>
> 1. For
> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we
> decide to append all data produced by one result partition to one file, so
> this option will be removed.
>
> 2. For
> taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the
> required buffer of the buffer pool will be min(numSubpartition + 1, this
> config value), so there it does not increase the number of required buffers
> but may reduce it when the parallelism is very high. So when user switch to
> sort-merge implementation, there should be no insufficient network buffers
> issue.
>
> 3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I
> agree a bool value is easier to config for user, so we will replace this
> with a bool switch. We can add this config option back is we have
> performance concerns in the future.
>
> Best,
> Yingjie
>
> Zhijiang  于2020年10月19日周一 下午5:27写道:
>
>> Thanks for launching the discussion and the respective FLIP, Yingjie!
>>
>> In general, I am +1 for this proposal since sort-merge ability has
>> already been taken widely in other batch-based project, like MR, Spark, etc.
>> And it indeed has some performance benefits in some scenarios as
>> mentioned in FLIP.
>>
>> I only have some thoughts for the section of `Public Interfaces` since it
>> cares about how the users understand and better use in practice.
>>  As for the new introduced classes, the can be further reviewed in follow
>> up PR since without existing interfaces refactoring ATM.
>>
>> 1.
>> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition:
>> the default value should be `1` I guess?  It is better to give a proper
>> default value that most of users do not need to
>>  care about it in practice.
>>
>> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition:
>> how about making the default for the number of required buffers in
>> LocalBufferPool as now for result partition?
>>  Then it is transparent for users to not increase any memory resource no
>> matter with either hash based or sort-merge based way. For the tuned
>> setting , it is better to give some hints to guide
>>  users how to adjust it for better performance based on some factors.
>>
>> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I
>> guess it is not very easy or determined to give a proper value for the
>> switch between hash based and sort-merge based.
>>  And how much data a subpartition taking (broadcast) or not suitable for
>> hash based is not completely decided by the number of parallelism somehow.
>> And users might be confused how to tune
>>  it in practice. I prefer to giving a simple boolean type option for easy
>> use and the default value can be false in MVP. Then it will not bring any
>> effects for users after upgrade to new version by default,
>>  and sort-merge option can be enabled to try out if users willing in
>> desired scenarios.
>>
>> Best,
>> Zhijiang
>> --
>> From:Till Rohrmann 
>> Send Time:2020年10月16日(星期五) 15:42
>> To:dev 
>> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking
>> Shuffle to Flink
>>
>> Thanks for sharing the preliminary numbers with us Yingjie. The numbers
>> look quite impressive :-)
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao 
>> wrote:
>>
>> > Hi Till,
>> >
>> > Thanks for your reply and comments.
>> >
>> > You are right, the proposed sort-merge based shuffle is an extension of
>> the
>> > existing blocking shuffle and does not change any default behavior of
>> > Flink.
>> >
>> > As for the performance, according to our previous experience, sort-merge
>> > based implementation can reduce the shuffle time by 30% to even 90%
>> > compared to hash-based implementation. My PoC implementation without any
>> > further optimization can already reduce the shuffle time over 10% on SSD
>> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
>> >
>> > After switch to sort-merge based blocking shuffle, some of our users'
>> jobs
>> > can scale up to over 2 parallelism, though need some JM and

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-22 Thread Yingjie Cao
Hi Zhijiang,

Thanks for your reply and suggestions.

1. For
taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we
decide to append all data produced by one result partition to one file, so
this option will be removed.

2. For
taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the
required buffer of the buffer pool will be min(numSubpartition + 1, this
config value), so there it does not increase the number of required buffers
but may reduce it when the parallelism is very high. So when user switch to
sort-merge implementation, there should be no insufficient network buffers
issue.

3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I
agree a bool value is easier to config for user, so we will replace this
with a bool switch. We can add this config option back is we have
performance concerns in the future.

Best,
Yingjie

Zhijiang  于2020年10月19日周一 下午5:27写道:

> Thanks for launching the discussion and the respective FLIP, Yingjie!
>
> In general, I am +1 for this proposal since sort-merge ability has already
> been taken widely in other batch-based project, like MR, Spark, etc.
> And it indeed has some performance benefits in some scenarios as mentioned
> in FLIP.
>
> I only have some thoughts for the section of `Public Interfaces` since it
> cares about how the users understand and better use in practice.
>  As for the new introduced classes, the can be further reviewed in follow
> up PR since without existing interfaces refactoring ATM.
>
> 1.
> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition:
> the default value should be `1` I guess?  It is better to give a proper
> default value that most of users do not need to
>  care about it in practice.
>
> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition:
> how about making the default for the number of required buffers in
> LocalBufferPool as now for result partition?
>  Then it is transparent for users to not increase any memory resource no
> matter with either hash based or sort-merge based way. For the tuned
> setting , it is better to give some hints to guide
>  users how to adjust it for better performance based on some factors.
>
> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I
> guess it is not very easy or determined to give a proper value for the
> switch between hash based and sort-merge based.
>  And how much data a subpartition taking (broadcast) or not suitable for
> hash based is not completely decided by the number of parallelism somehow.
> And users might be confused how to tune
>  it in practice. I prefer to giving a simple boolean type option for easy
> use and the default value can be false in MVP. Then it will not bring any
> effects for users after upgrade to new version by default,
>  and sort-merge option can be enabled to try out if users willing in
> desired scenarios.
>
> Best,
> Zhijiang
> --
> From:Till Rohrmann 
> Send Time:2020年10月16日(星期五) 15:42
> To:dev 
> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking
> Shuffle to Flink
>
> Thanks for sharing the preliminary numbers with us Yingjie. The numbers
> look quite impressive :-)
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao 
> wrote:
>
> > Hi Till,
> >
> > Thanks for your reply and comments.
> >
> > You are right, the proposed sort-merge based shuffle is an extension of
> the
> > existing blocking shuffle and does not change any default behavior of
> > Flink.
> >
> > As for the performance, according to our previous experience, sort-merge
> > based implementation can reduce the shuffle time by 30% to even 90%
> > compared to hash-based implementation. My PoC implementation without any
> > further optimization can already reduce the shuffle time over 10% on SSD
> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
> >
> > After switch to sort-merge based blocking shuffle, some of our users'
> jobs
> > can scale up to over 2 parallelism, though need some JM and RM side
> > optimization. I haven't ever tried to find where the upper bound is, but
> I
> > guess sever tens of thousand should be able to m
> > <
> >
> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
> > >eet
> > the needs of most users.
> >
> > Best,
> > Yingjie
> >
> > Till Rohrmann  于2020年10月15日周四 下午3:57写道:
> >
> > > Hi Yingjie,
> > >
> > > thanks for proposing the sort-merge based blocking shuffle. I li

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-15 Thread Yingjie Cao
Hi Till,

Thanks for your reply and comments.

You are right, the proposed sort-merge based shuffle is an extension of the
existing blocking shuffle and does not change any default behavior of Flink.

As for the performance, according to our previous experience, sort-merge
based implementation can reduce the shuffle time by 30% to even 90%
compared to hash-based implementation. My PoC implementation without any
further optimization can already reduce the shuffle time over 10% on SSD
and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.

After switch to sort-merge based blocking shuffle, some of our users' jobs
can scale up to over 2 parallelism, though need some JM and RM side
optimization. I haven't ever tried to find where the upper bound is, but I
guess sever tens of thousand should be able to m
<http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W>eet
the needs of most users.

Best,
Yingjie

Till Rohrmann  于2020年10月15日周四 下午3:57写道:

> Hi Yingjie,
>
> thanks for proposing the sort-merge based blocking shuffle. I like the
> proposal and it does not seem to change the internals of Flink. Instead it
> is an extension of existing interfaces which makes it a
> non-invasive addition.
>
> Do you have any numbers comparing the performance of the sort-merge based
> shuffle against the hash-based shuffle? To what parallelism can you scale
> up when using the sort-merge based shuffle?
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao 
> wrote:
>
> > Hi devs,
> >
> > Currently, Flink adopts a hash-style blocking shuffle implementation
> which
> > writes data sent to different reducer tasks into separate files
> > concurrently. Compared to sort-merge based approach writes those data
> > together into a single file and merges those small files into bigger
> ones,
> > hash-based approach has several weak points when it comes to running
> large
> > scale batch jobs:
> >
> >1. *Stability*: For high parallelism (tens of thousands) batch job,
> >current hash-based blocking shuffle implementation writes too many
> files
> >concurrently which gives high pressure to the file system, for
> example,
> >maintenance of too many file metas, exhaustion of inodes or file
> >descriptors. All of these can be potential stability issues.
> Sort-Merge
> >based blocking shuffle don’t have the problem because for one result
> >partition, only one file is written at the same time.
> >2. *Performance*: Large amounts of small shuffle files and random IO
> can
> >influence shuffle performance a lot especially for hdd (for ssd,
> > sequential
> >read is also important because of read ahead and cache). For batch
> jobs
> >processing massive data, small amount of data per subpartition is
> common
> >because of high parallelism. Besides, data skew is another cause of
> > small
> >subpartition files. By merging data of all subpartitions together in
> one
> >file, more sequential read can be achieved.
> >3. *Resource*: For current hash-based implementation, each
> subpartition
> >needs at least one buffer. For large scale batch shuffles, the memory
> >consumption can be huge. For example, we need at least 320M network
> > memory
> >per result partition if parallelism is set to 1 and because of the
> > huge
> >network consumption, it is hard to config the network memory for large
> >scale batch job and  sometimes parallelism can not be increased just
> >because of insufficient network memory  which leads to bad user
> > experience.
> >
> > To improve Flink’s capability of running large scale batch jobs, we would
> > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> > feedback is appreciated.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >
> > Best,
> > Yingjie
> >
>


  1   2   >