[DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-14 Thread Yingjie Cao
Hi devs,

Currently, Flink adopts a hash-style blocking shuffle implementation which
writes data sent to different reducer tasks into separate files
concurrently. Compared to sort-merge based approach writes those data
together into a single file and merges those small files into bigger ones,
hash-based approach has several weak points when it comes to running large
scale batch jobs:

   1. *Stability*: For high parallelism (tens of thousands) batch job,
   current hash-based blocking shuffle implementation writes too many files
   concurrently which gives high pressure to the file system, for example,
   maintenance of too many file metas, exhaustion of inodes or file
   descriptors. All of these can be potential stability issues. Sort-Merge
   based blocking shuffle don’t have the problem because for one result
   partition, only one file is written at the same time.
   2. *Performance*: Large amounts of small shuffle files and random IO can
   influence shuffle performance a lot especially for hdd (for ssd, sequential
   read is also important because of read ahead and cache). For batch jobs
   processing massive data, small amount of data per subpartition is common
   because of high parallelism. Besides, data skew is another cause of small
   subpartition files. By merging data of all subpartitions together in one
   file, more sequential read can be achieved.
   3. *Resource*: For current hash-based implementation, each subpartition
   needs at least one buffer. For large scale batch shuffles, the memory
   consumption can be huge. For example, we need at least 320M network memory
   per result partition if parallelism is set to 1 and because of the huge
   network consumption, it is hard to config the network memory for large
   scale batch job and  sometimes parallelism can not be increased just
   because of insufficient network memory  which leads to bad user experience.

To improve Flink’s capability of running large scale batch jobs, we would
like to introduce sort-merge based blocking shuffle to Flink[1]. Any
feedback is appreciated.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink

Best,
Yingjie


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Yingjie Cao
Hi devs & users,

The FLIP-148[1] has been released with Flink 1.13 and the final
implementation has some differences compared with the initial proposal in
the FLIP document. To avoid potential misunderstandings, I have updated the
FLIP document[1] accordingly and I also drafted another document[2] which
contains more implementation details.  FYI.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
[2]
https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing

Best,
Yingjie

Yingjie Cao  于2020年10月15日周四 上午11:02写道:

> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
>1. *Stability*: For high parallelism (tens of thousands) batch job,
>current hash-based blocking shuffle implementation writes too many files
>concurrently which gives high pressure to the file system, for example,
>maintenance of too many file metas, exhaustion of inodes or file
>descriptors. All of these can be potential stability issues. Sort-Merge
>based blocking shuffle don’t have the problem because for one result
>partition, only one file is written at the same time.
>2. *Performance*: Large amounts of small shuffle files and random IO
>can influence shuffle performance a lot especially for hdd (for ssd,
>sequential read is also important because of read ahead and cache). For
>batch jobs processing massive data, small amount of data per subpartition
>is common because of high parallelism. Besides, data skew is another cause
>of small subpartition files. By merging data of all subpartitions together
>in one file, more sequential read can be achieved.
>3. *Resource*: For current hash-based implementation, each
>subpartition needs at least one buffer. For large scale batch shuffles, the
>memory consumption can be huge. For example, we need at least 320M network
>memory per result partition if parallelism is set to 1 and because of
>the huge network consumption, it is hard to config the network memory for
>large scale batch job and  sometimes parallelism can not be increased just
>because of insufficient network memory  which leads to bad user experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-06 Thread Jingsong Li
Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao  wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>1. *Stability*: For high parallelism (tens of thousands) batch job,
>>current hash-based blocking shuffle implementation writes too many files
>>concurrently which gives high pressure to the file system, for example,
>>maintenance of too many file metas, exhaustion of inodes or file
>>descriptors. All of these can be potential stability issues. Sort-Merge
>>based blocking shuffle don’t have the problem because for one result
>>partition, only one file is written at the same time.
>>2. *Performance*: Large amounts of small shuffle files and random IO
>>can influence shuffle performance a lot especially for hdd (for ssd,
>>sequential read is also important because of read ahead and cache). For
>>batch jobs processing massive data, small amount of data per subpartition
>>is common because of high parallelism. Besides, data skew is another cause
>>of small subpartition files. By merging data of all subpartitions together
>>in one file, more sequential read can be achieved.
>>3. *Resource*: For current hash-based implementation, each
>>subpartition needs at least one buffer. For large scale batch shuffles, 
>> the
>>memory consumption can be huge. For example, we need at least 320M network
>>memory per result partition if parallelism is set to 1 and because of
>>the huge network consumption, it is hard to config the network memory for
>>large scale batch job and  sometimes parallelism can not be increased just
>>because of insufficient network memory  which leads to bad user 
>> experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

-- 
Best, Jingsong Lee


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Till Rohrmann
Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:

> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details.  FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
> >>current hash-based blocking shuffle implementation writes too many
> files
> >>concurrently which gives high pressure to the file system, for
> example,
> >>maintenance of too many file metas, exhaustion of inodes or file
> >>descriptors. All of these can be potential stability issues.
> Sort-Merge
> >>based blocking shuffle don’t have the problem because for one result
> >>partition, only one file is written at the same time.
> >>2. *Performance*: Large amounts of small shuffle files and random IO
> >>can influence shuffle performance a lot especially for hdd (for ssd,
> >>sequential read is also important because of read ahead and cache).
> For
> >>batch jobs processing massive data, small amount of data per
> subpartition
> >>is common because of high parallelism. Besides, data skew is another
> cause
> >>of small subpartition files. By merging data of all subpartitions
> together
> >>in one file, more sequential read can be achieved.
> >>3. *Resource*: For current hash-based implementation, each
> >>subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >>memory consumption can be huge. For example, we need at least 320M
> network
> >>memory per result partition if parallelism is set to 1 and
> because of
> >>the huge network consumption, it is hard to config the network
> memory for
> >>large scale batch job and  sometimes parallelism can not be
> increased just
> >>because of insufficient network memory  which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Yingjie Cao
Hi Till,

Thanks for the suggestion. The blog post is already on the way.

Best,
Yingjie

Till Rohrmann  于2021年6月8日周二 下午5:30写道:

> Thanks for the update Yingjie. Would it make sense to write a short blog
> post about this feature including some performance improvement numbers? I
> think this could be interesting to our users.
>
> Cheers,
> Till
>
> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li  wrote:
>
>> Thanks Yingjie for the great effort!
>>
>> This is really helpful to Flink Batch users!
>>
>> Best,
>> Jingsong
>>
>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>> wrote:
>>
>> > Hi devs & users,
>> >
>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>> > implementation has some differences compared with the initial proposal
>> in
>> > the FLIP document. To avoid potential misunderstandings, I have updated
>> the
>> > FLIP document[1] accordingly and I also drafted another document[2]
>> which
>> > contains more implementation details.  FYI.
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>> > [2]
>> >
>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>> >
>> > Best,
>> > Yingjie
>> >
>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>> >
>> >> Hi devs,
>> >>
>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>> >> which writes data sent to different reducer tasks into separate files
>> >> concurrently. Compared to sort-merge based approach writes those data
>> >> together into a single file and merges those small files into bigger
>> ones,
>> >> hash-based approach has several weak points when it comes to running
>> large
>> >> scale batch jobs:
>> >>
>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>> >>current hash-based blocking shuffle implementation writes too many
>> files
>> >>concurrently which gives high pressure to the file system, for
>> example,
>> >>maintenance of too many file metas, exhaustion of inodes or file
>> >>descriptors. All of these can be potential stability issues.
>> Sort-Merge
>> >>based blocking shuffle don’t have the problem because for one result
>> >>partition, only one file is written at the same time.
>> >>2. *Performance*: Large amounts of small shuffle files and random IO
>> >>can influence shuffle performance a lot especially for hdd (for ssd,
>> >>sequential read is also important because of read ahead and cache).
>> For
>> >>batch jobs processing massive data, small amount of data per
>> subpartition
>> >>is common because of high parallelism. Besides, data skew is
>> another cause
>> >>of small subpartition files. By merging data of all subpartitions
>> together
>> >>in one file, more sequential read can be achieved.
>> >>3. *Resource*: For current hash-based implementation, each
>> >>subpartition needs at least one buffer. For large scale batch
>> shuffles, the
>> >>memory consumption can be huge. For example, we need at least 320M
>> network
>> >>memory per result partition if parallelism is set to 1 and
>> because of
>> >>the huge network consumption, it is hard to config the network
>> memory for
>> >>large scale batch job and  sometimes parallelism can not be
>> increased just
>> >>because of insufficient network memory  which leads to bad user
>> experience.
>> >>
>> >> To improve Flink’s capability of running large scale batch jobs, we
>> would
>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> >> feedback is appreciated.
>> >>
>> >> [1]
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> >>
>> >> Best,
>> >> Yingjie
>> >>
>> >
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2021-06-08 Thread Till Rohrmann
Great :-)

On Tue, Jun 8, 2021 at 1:11 PM Yingjie Cao  wrote:

> Hi Till,
>
> Thanks for the suggestion. The blog post is already on the way.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2021年6月8日周二 下午5:30写道:
>
>> Thanks for the update Yingjie. Would it make sense to write a short blog
>> post about this feature including some performance improvement numbers? I
>> think this could be interesting to our users.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li 
>> wrote:
>>
>>> Thanks Yingjie for the great effort!
>>>
>>> This is really helpful to Flink Batch users!
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao 
>>> wrote:
>>>
>>> > Hi devs & users,
>>> >
>>> > The FLIP-148[1] has been released with Flink 1.13 and the final
>>> > implementation has some differences compared with the initial proposal
>>> in
>>> > the FLIP document. To avoid potential misunderstandings, I have
>>> updated the
>>> > FLIP document[1] accordingly and I also drafted another document[2]
>>> which
>>> > contains more implementation details.  FYI.
>>> >
>>> > [1]
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
>>> > [2]
>>> >
>>> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>>> >
>>> > Best,
>>> > Yingjie
>>> >
>>> > Yingjie Cao  于2020年10月15日周四 上午11:02写道:
>>> >
>>> >> Hi devs,
>>> >>
>>> >> Currently, Flink adopts a hash-style blocking shuffle implementation
>>> >> which writes data sent to different reducer tasks into separate files
>>> >> concurrently. Compared to sort-merge based approach writes those data
>>> >> together into a single file and merges those small files into bigger
>>> ones,
>>> >> hash-based approach has several weak points when it comes to running
>>> large
>>> >> scale batch jobs:
>>> >>
>>> >>1. *Stability*: For high parallelism (tens of thousands) batch job,
>>> >>current hash-based blocking shuffle implementation writes too many
>>> files
>>> >>concurrently which gives high pressure to the file system, for
>>> example,
>>> >>maintenance of too many file metas, exhaustion of inodes or file
>>> >>descriptors. All of these can be potential stability issues.
>>> Sort-Merge
>>> >>based blocking shuffle don’t have the problem because for one
>>> result
>>> >>partition, only one file is written at the same time.
>>> >>2. *Performance*: Large amounts of small shuffle files and random
>>> IO
>>> >>can influence shuffle performance a lot especially for hdd (for
>>> ssd,
>>> >>sequential read is also important because of read ahead and
>>> cache). For
>>> >>batch jobs processing massive data, small amount of data per
>>> subpartition
>>> >>is common because of high parallelism. Besides, data skew is
>>> another cause
>>> >>of small subpartition files. By merging data of all subpartitions
>>> together
>>> >>in one file, more sequential read can be achieved.
>>> >>3. *Resource*: For current hash-based implementation, each
>>> >>subpartition needs at least one buffer. For large scale batch
>>> shuffles, the
>>> >>memory consumption can be huge. For example, we need at least 320M
>>> network
>>> >>memory per result partition if parallelism is set to 1 and
>>> because of
>>> >>the huge network consumption, it is hard to config the network
>>> memory for
>>> >>large scale batch job and  sometimes parallelism can not be
>>> increased just
>>> >>because of insufficient network memory  which leads to bad user
>>> experience.
>>> >>
>>> >> To improve Flink’s capability of running large scale batch jobs, we
>>> would
>>> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>>> >> feedback is appreciated.
>>> >>
>>> >> [1]
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>> >>
>>> >> Best,
>>> >> Yingjie
>>> >>
>>> >
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-15 Thread Till Rohrmann
Hi Yingjie,

thanks for proposing the sort-merge based blocking shuffle. I like the
proposal and it does not seem to change the internals of Flink. Instead it
is an extension of existing interfaces which makes it a
non-invasive addition.

Do you have any numbers comparing the performance of the sort-merge based
shuffle against the hash-based shuffle? To what parallelism can you scale
up when using the sort-merge based shuffle?

Cheers,
Till

On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao  wrote:

> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
>1. *Stability*: For high parallelism (tens of thousands) batch job,
>current hash-based blocking shuffle implementation writes too many files
>concurrently which gives high pressure to the file system, for example,
>maintenance of too many file metas, exhaustion of inodes or file
>descriptors. All of these can be potential stability issues. Sort-Merge
>based blocking shuffle don’t have the problem because for one result
>partition, only one file is written at the same time.
>2. *Performance*: Large amounts of small shuffle files and random IO can
>influence shuffle performance a lot especially for hdd (for ssd,
> sequential
>read is also important because of read ahead and cache). For batch jobs
>processing massive data, small amount of data per subpartition is common
>because of high parallelism. Besides, data skew is another cause of
> small
>subpartition files. By merging data of all subpartitions together in one
>file, more sequential read can be achieved.
>3. *Resource*: For current hash-based implementation, each subpartition
>needs at least one buffer. For large scale batch shuffles, the memory
>consumption can be huge. For example, we need at least 320M network
> memory
>per result partition if parallelism is set to 1 and because of the
> huge
>network consumption, it is hard to config the network memory for large
>scale batch job and  sometimes parallelism can not be increased just
>because of insufficient network memory  which leads to bad user
> experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-15 Thread Yingjie Cao
Hi Till,

Thanks for your reply and comments.

You are right, the proposed sort-merge based shuffle is an extension of the
existing blocking shuffle and does not change any default behavior of Flink.

As for the performance, according to our previous experience, sort-merge
based implementation can reduce the shuffle time by 30% to even 90%
compared to hash-based implementation. My PoC implementation without any
further optimization can already reduce the shuffle time over 10% on SSD
and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.

After switch to sort-merge based blocking shuffle, some of our users' jobs
can scale up to over 2 parallelism, though need some JM and RM side
optimization. I haven't ever tried to find where the upper bound is, but I
guess sever tens of thousand should be able to m
eet
the needs of most users.

Best,
Yingjie

Till Rohrmann  于2020年10月15日周四 下午3:57写道:

> Hi Yingjie,
>
> thanks for proposing the sort-merge based blocking shuffle. I like the
> proposal and it does not seem to change the internals of Flink. Instead it
> is an extension of existing interfaces which makes it a
> non-invasive addition.
>
> Do you have any numbers comparing the performance of the sort-merge based
> shuffle against the hash-based shuffle? To what parallelism can you scale
> up when using the sort-merge based shuffle?
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao 
> wrote:
>
> > Hi devs,
> >
> > Currently, Flink adopts a hash-style blocking shuffle implementation
> which
> > writes data sent to different reducer tasks into separate files
> > concurrently. Compared to sort-merge based approach writes those data
> > together into a single file and merges those small files into bigger
> ones,
> > hash-based approach has several weak points when it comes to running
> large
> > scale batch jobs:
> >
> >1. *Stability*: For high parallelism (tens of thousands) batch job,
> >current hash-based blocking shuffle implementation writes too many
> files
> >concurrently which gives high pressure to the file system, for
> example,
> >maintenance of too many file metas, exhaustion of inodes or file
> >descriptors. All of these can be potential stability issues.
> Sort-Merge
> >based blocking shuffle don’t have the problem because for one result
> >partition, only one file is written at the same time.
> >2. *Performance*: Large amounts of small shuffle files and random IO
> can
> >influence shuffle performance a lot especially for hdd (for ssd,
> > sequential
> >read is also important because of read ahead and cache). For batch
> jobs
> >processing massive data, small amount of data per subpartition is
> common
> >because of high parallelism. Besides, data skew is another cause of
> > small
> >subpartition files. By merging data of all subpartitions together in
> one
> >file, more sequential read can be achieved.
> >3. *Resource*: For current hash-based implementation, each
> subpartition
> >needs at least one buffer. For large scale batch shuffles, the memory
> >consumption can be huge. For example, we need at least 320M network
> > memory
> >per result partition if parallelism is set to 1 and because of the
> > huge
> >network consumption, it is hard to config the network memory for large
> >scale batch job and  sometimes parallelism can not be increased just
> >because of insufficient network memory  which leads to bad user
> > experience.
> >
> > To improve Flink’s capability of running large scale batch jobs, we would
> > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> > feedback is appreciated.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >
> > Best,
> > Yingjie
> >
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-16 Thread Till Rohrmann
Thanks for sharing the preliminary numbers with us Yingjie. The numbers
look quite impressive :-)

Cheers,
Till

On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao  wrote:

> Hi Till,
>
> Thanks for your reply and comments.
>
> You are right, the proposed sort-merge based shuffle is an extension of the
> existing blocking shuffle and does not change any default behavior of
> Flink.
>
> As for the performance, according to our previous experience, sort-merge
> based implementation can reduce the shuffle time by 30% to even 90%
> compared to hash-based implementation. My PoC implementation without any
> further optimization can already reduce the shuffle time over 10% on SSD
> and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
>
> After switch to sort-merge based blocking shuffle, some of our users' jobs
> can scale up to over 2 parallelism, though need some JM and RM side
> optimization. I haven't ever tried to find where the upper bound is, but I
> guess sever tens of thousand should be able to m
> <
> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
> >eet
> the needs of most users.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2020年10月15日周四 下午3:57写道:
>
> > Hi Yingjie,
> >
> > thanks for proposing the sort-merge based blocking shuffle. I like the
> > proposal and it does not seem to change the internals of Flink. Instead
> it
> > is an extension of existing interfaces which makes it a
> > non-invasive addition.
> >
> > Do you have any numbers comparing the performance of the sort-merge based
> > shuffle against the hash-based shuffle? To what parallelism can you scale
> > up when using the sort-merge based shuffle?
> >
> > Cheers,
> > Till
> >
> > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao 
> > wrote:
> >
> > > Hi devs,
> > >
> > > Currently, Flink adopts a hash-style blocking shuffle implementation
> > which
> > > writes data sent to different reducer tasks into separate files
> > > concurrently. Compared to sort-merge based approach writes those data
> > > together into a single file and merges those small files into bigger
> > ones,
> > > hash-based approach has several weak points when it comes to running
> > large
> > > scale batch jobs:
> > >
> > >1. *Stability*: For high parallelism (tens of thousands) batch job,
> > >current hash-based blocking shuffle implementation writes too many
> > files
> > >concurrently which gives high pressure to the file system, for
> > example,
> > >maintenance of too many file metas, exhaustion of inodes or file
> > >descriptors. All of these can be potential stability issues.
> > Sort-Merge
> > >based blocking shuffle don’t have the problem because for one result
> > >partition, only one file is written at the same time.
> > >2. *Performance*: Large amounts of small shuffle files and random IO
> > can
> > >influence shuffle performance a lot especially for hdd (for ssd,
> > > sequential
> > >read is also important because of read ahead and cache). For batch
> > jobs
> > >processing massive data, small amount of data per subpartition is
> > common
> > >because of high parallelism. Besides, data skew is another cause of
> > > small
> > >subpartition files. By merging data of all subpartitions together in
> > one
> > >file, more sequential read can be achieved.
> > >3. *Resource*: For current hash-based implementation, each
> > subpartition
> > >needs at least one buffer. For large scale batch shuffles, the
> memory
> > >consumption can be huge. For example, we need at least 320M network
> > > memory
> > >per result partition if parallelism is set to 1 and because of
> the
> > > huge
> > >network consumption, it is hard to config the network memory for
> large
> > >scale batch job and  sometimes parallelism can not be increased just
> > >because of insufficient network memory  which leads to bad user
> > > experience.
> > >
> > > To improve Flink’s capability of running large scale batch jobs, we
> would
> > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> > > feedback is appreciated.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> > >
> > > Best,
> > > Yingjie
> > >
> >
>


Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-19 Thread Zhijiang
Thanks for launching the discussion and the respective FLIP, Yingjie!

In general, I am +1 for this proposal since sort-merge ability has already been 
taken widely in other batch-based project, like MR, Spark, etc.
And it indeed has some performance benefits in some scenarios as mentioned in 
FLIP.

I only have some thoughts for the section of `Public Interfaces` since it cares 
about how the users understand and better use in practice.
 As for the new introduced classes, the can be further reviewed in follow up PR 
since without existing interfaces refactoring ATM.

1. taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition: the 
default value should be `1` I guess?  It is better to give a proper default 
value that most of users do not need to
 care about it in practice.

2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: how 
about making the default for the number of required buffers in LocalBufferPool 
as now for result partition?
 Then it is transparent for users to not increase any memory resource no matter 
with either hash based or sort-merge based way. For the tuned setting , it is 
better to give some hints to guide
 users how to adjust it for better performance based on some factors.

3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I guess it 
is not very easy or determined to give a proper value for the switch between 
hash based and sort-merge based.
 And how much data a subpartition taking (broadcast) or not suitable for hash 
based is not completely decided by the number of parallelism somehow. And users 
might be confused how to tune
 it in practice. I prefer to giving a simple boolean type option for easy use 
and the default value can be false in MVP. Then it will not bring any effects 
for users after upgrade to new version by default,
 and sort-merge option can be enabled to try out if users willing in desired 
scenarios. 

Best,
Zhijiang
--
From:Till Rohrmann 
Send Time:2020年10月16日(星期五) 15:42
To:dev 
Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to 
Flink

Thanks for sharing the preliminary numbers with us Yingjie. The numbers
look quite impressive :-)

Cheers,
Till

On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao  wrote:

> Hi Till,
>
> Thanks for your reply and comments.
>
> You are right, the proposed sort-merge based shuffle is an extension of the
> existing blocking shuffle and does not change any default behavior of
> Flink.
>
> As for the performance, according to our previous experience, sort-merge
> based implementation can reduce the shuffle time by 30% to even 90%
> compared to hash-based implementation. My PoC implementation without any
> further optimization can already reduce the shuffle time over 10% on SSD
> and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
>
> After switch to sort-merge based blocking shuffle, some of our users' jobs
> can scale up to over 2 parallelism, though need some JM and RM side
> optimization. I haven't ever tried to find where the upper bound is, but I
> guess sever tens of thousand should be able to m
> <
> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
> >eet
> the needs of most users.
>
> Best,
> Yingjie
>
> Till Rohrmann  于2020年10月15日周四 下午3:57写道:
>
> > Hi Yingjie,
> >
> > thanks for proposing the sort-merge based blocking shuffle. I like the
> > proposal and it does not seem to change the internals of Flink. Instead
> it
> > is an extension of existing interfaces which makes it a
> > non-invasive addition.
> >
> > Do you have any numbers comparing the performance of the sort-merge based
> > shuffle against the hash-based shuffle? To what parallelism can you scale
> > up when using the sort-merge based shuffle?
> >
> > Cheers,
> > Till
> >
> > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao 
> > wrote:
> >
> > > Hi devs,
> > >
> > > Currently, Flink adopts a hash-style blocking shuffle implementation
> > which
> > > writes data sent to different reducer tasks into separate files
> > > concurrently. Compared to sort-merge based approach writes those data
> > > together into a single file and merges those small files into bigger
> > ones,
> > > hash-based approach has several weak points when it comes to running
> > large
> > > scale batch jobs:
> > >
> > >1. *Stability*: For high parallelism (tens of thousands) batch job,
> > >current hash-based blocking shuffle implementation writes too many
> > files
> > >concurrently which gives high pr

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-22 Thread Yingjie Cao
Hi Zhijiang,

Thanks for your reply and suggestions.

1. For
taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we
decide to append all data produced by one result partition to one file, so
this option will be removed.

2. For
taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the
required buffer of the buffer pool will be min(numSubpartition + 1, this
config value), so there it does not increase the number of required buffers
but may reduce it when the parallelism is very high. So when user switch to
sort-merge implementation, there should be no insufficient network buffers
issue.

3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I
agree a bool value is easier to config for user, so we will replace this
with a bool switch. We can add this config option back is we have
performance concerns in the future.

Best,
Yingjie

Zhijiang  于2020年10月19日周一 下午5:27写道:

> Thanks for launching the discussion and the respective FLIP, Yingjie!
>
> In general, I am +1 for this proposal since sort-merge ability has already
> been taken widely in other batch-based project, like MR, Spark, etc.
> And it indeed has some performance benefits in some scenarios as mentioned
> in FLIP.
>
> I only have some thoughts for the section of `Public Interfaces` since it
> cares about how the users understand and better use in practice.
>  As for the new introduced classes, the can be further reviewed in follow
> up PR since without existing interfaces refactoring ATM.
>
> 1.
> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition:
> the default value should be `1` I guess?  It is better to give a proper
> default value that most of users do not need to
>  care about it in practice.
>
> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition:
> how about making the default for the number of required buffers in
> LocalBufferPool as now for result partition?
>  Then it is transparent for users to not increase any memory resource no
> matter with either hash based or sort-merge based way. For the tuned
> setting , it is better to give some hints to guide
>  users how to adjust it for better performance based on some factors.
>
> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I
> guess it is not very easy or determined to give a proper value for the
> switch between hash based and sort-merge based.
>  And how much data a subpartition taking (broadcast) or not suitable for
> hash based is not completely decided by the number of parallelism somehow.
> And users might be confused how to tune
>  it in practice. I prefer to giving a simple boolean type option for easy
> use and the default value can be false in MVP. Then it will not bring any
> effects for users after upgrade to new version by default,
>  and sort-merge option can be enabled to try out if users willing in
> desired scenarios.
>
> Best,
> Zhijiang
> ------------------
> From:Till Rohrmann 
> Send Time:2020年10月16日(星期五) 15:42
> To:dev 
> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking
> Shuffle to Flink
>
> Thanks for sharing the preliminary numbers with us Yingjie. The numbers
> look quite impressive :-)
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao 
> wrote:
>
> > Hi Till,
> >
> > Thanks for your reply and comments.
> >
> > You are right, the proposed sort-merge based shuffle is an extension of
> the
> > existing blocking shuffle and does not change any default behavior of
> > Flink.
> >
> > As for the performance, according to our previous experience, sort-merge
> > based implementation can reduce the shuffle time by 30% to even 90%
> > compared to hash-based implementation. My PoC implementation without any
> > further optimization can already reduce the shuffle time over 10% on SSD
> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
> >
> > After switch to sort-merge based blocking shuffle, some of our users'
> jobs
> > can scale up to over 2 parallelism, though need some JM and RM side
> > optimization. I haven't ever tried to find where the upper bound is, but
> I
> > guess sever tens of thousand should be able to m
> > <
> >
> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
> > >eet
> > the needs of most users.
> >
> > Best,
> > Yingjie
> >
> > Till Rohrmann  于2020年10月15日周四 下午3:57写道:
> >
> > > Hi Yingjie,
> > >
> > > thanks for proposing the sort-merge based blocking shu

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-22 Thread Yingjie Cao
Hi devs,

This discussion thread has been opened for over a week. If there is no
other concerns, I'd like to open a voting thread soon.

Best,
Yingjie

Yingjie Cao  于2020年10月23日周五 上午11:56写道:

> Hi Zhijiang,
>
> Thanks for your reply and suggestions.
>
> 1. For
> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we
> decide to append all data produced by one result partition to one file, so
> this option will be removed.
>
> 2. For
> taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the
> required buffer of the buffer pool will be min(numSubpartition + 1, this
> config value), so there it does not increase the number of required buffers
> but may reduce it when the parallelism is very high. So when user switch to
> sort-merge implementation, there should be no insufficient network buffers
> issue.
>
> 3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I
> agree a bool value is easier to config for user, so we will replace this
> with a bool switch. We can add this config option back is we have
> performance concerns in the future.
>
> Best,
> Yingjie
>
> Zhijiang  于2020年10月19日周一 下午5:27写道:
>
>> Thanks for launching the discussion and the respective FLIP, Yingjie!
>>
>> In general, I am +1 for this proposal since sort-merge ability has
>> already been taken widely in other batch-based project, like MR, Spark, etc.
>> And it indeed has some performance benefits in some scenarios as
>> mentioned in FLIP.
>>
>> I only have some thoughts for the section of `Public Interfaces` since it
>> cares about how the users understand and better use in practice.
>>  As for the new introduced classes, the can be further reviewed in follow
>> up PR since without existing interfaces refactoring ATM.
>>
>> 1.
>> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition:
>> the default value should be `1` I guess?  It is better to give a proper
>> default value that most of users do not need to
>>  care about it in practice.
>>
>> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition:
>> how about making the default for the number of required buffers in
>> LocalBufferPool as now for result partition?
>>  Then it is transparent for users to not increase any memory resource no
>> matter with either hash based or sort-merge based way. For the tuned
>> setting , it is better to give some hints to guide
>>  users how to adjust it for better performance based on some factors.
>>
>> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I
>> guess it is not very easy or determined to give a proper value for the
>> switch between hash based and sort-merge based.
>>  And how much data a subpartition taking (broadcast) or not suitable for
>> hash based is not completely decided by the number of parallelism somehow.
>> And users might be confused how to tune
>>  it in practice. I prefer to giving a simple boolean type option for easy
>> use and the default value can be false in MVP. Then it will not bring any
>> effects for users after upgrade to new version by default,
>>  and sort-merge option can be enabled to try out if users willing in
>> desired scenarios.
>>
>> Best,
>> Zhijiang
>> --
>> From:Till Rohrmann 
>> Send Time:2020年10月16日(星期五) 15:42
>> To:dev 
>> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking
>> Shuffle to Flink
>>
>> Thanks for sharing the preliminary numbers with us Yingjie. The numbers
>> look quite impressive :-)
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao 
>> wrote:
>>
>> > Hi Till,
>> >
>> > Thanks for your reply and comments.
>> >
>> > You are right, the proposed sort-merge based shuffle is an extension of
>> the
>> > existing blocking shuffle and does not change any default behavior of
>> > Flink.
>> >
>> > As for the performance, according to our previous experience, sort-merge
>> > based implementation can reduce the shuffle time by 30% to even 90%
>> > compared to hash-based implementation. My PoC implementation without any
>> > further optimization can already reduce the shuffle time over 10% on SSD
>> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
>> >
>> > After switch to sort-merge based blocking shuffle, some of our users'
>> jobs
>> > can scale up to over 2 parallelism, though need s