Hi devs,

This discussion thread has been opened for over a week. If there is no
other concerns, I'd like to open a voting thread soon.

Best,
Yingjie

Yingjie Cao <kevin.ying...@gmail.com> 于2020年10月23日周五 上午11:56写道:

> Hi Zhijiang,
>
> Thanks for your reply and suggestions.
>
> 1. For
> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we
> decide to append all data produced by one result partition to one file, so
> this option will be removed.
>
> 2. For
> taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the
> required buffer of the buffer pool will be min(numSubpartition + 1, this
> config value), so there it does not increase the number of required buffers
> but may reduce it when the parallelism is very high. So when user switch to
> sort-merge implementation, there should be no insufficient network buffers
> issue.
>
> 3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I
> agree a bool value is easier to config for user, so we will replace this
> with a bool switch. We can add this config option back is we have
> performance concerns in the future.
>
> Best,
> Yingjie
>
> Zhijiang <wangzhijiang...@aliyun.com.invalid> 于2020年10月19日周一 下午5:27写道:
>
>> Thanks for launching the discussion and the respective FLIP, Yingjie!
>>
>> In general, I am +1 for this proposal since sort-merge ability has
>> already been taken widely in other batch-based project, like MR, Spark, etc.
>> And it indeed has some performance benefits in some scenarios as
>> mentioned in FLIP.
>>
>> I only have some thoughts for the section of `Public Interfaces` since it
>> cares about how the users understand and better use in practice.
>>  As for the new introduced classes, the can be further reviewed in follow
>> up PR since without existing interfaces refactoring ATM.
>>
>> 1.
>> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition:
>> the default value should be `1` I guess?  It is better to give a proper
>> default value that most of users do not need to
>>  care about it in practice.
>>
>> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition:
>> how about making the default for the number of required buffers in
>> LocalBufferPool as now for result partition?
>>  Then it is transparent for users to not increase any memory resource no
>> matter with either hash based or sort-merge based way. For the tuned
>> setting , it is better to give some hints to guide
>>  users how to adjust it for better performance based on some factors.
>>
>> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I
>> guess it is not very easy or determined to give a proper value for the
>> switch between hash based and sort-merge based.
>>  And how much data a subpartition taking (broadcast) or not suitable for
>> hash based is not completely decided by the number of parallelism somehow.
>> And users might be confused how to tune
>>  it in practice. I prefer to giving a simple boolean type option for easy
>> use and the default value can be false in MVP. Then it will not bring any
>> effects for users after upgrade to new version by default,
>>  and sort-merge option can be enabled to try out if users willing in
>> desired scenarios.
>>
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> From:Till Rohrmann <trohrm...@apache.org>
>> Send Time:2020年10月16日(星期五) 15:42
>> To:dev <dev@flink.apache.org>
>> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking
>> Shuffle to Flink
>>
>> Thanks for sharing the preliminary numbers with us Yingjie. The numbers
>> look quite impressive :-)
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <kevin.ying...@gmail.com>
>> wrote:
>>
>> > Hi Till,
>> >
>> > Thanks for your reply and comments.
>> >
>> > You are right, the proposed sort-merge based shuffle is an extension of
>> the
>> > existing blocking shuffle and does not change any default behavior of
>> > Flink.
>> >
>> > As for the performance, according to our previous experience, sort-merge
>> > based implementation can reduce the shuffle time by 30% to even 90%
>> > compared to hash-based implementation. My PoC implementation without any
>> > further optimization can already reduce the shuffle time over 10% on SSD
>> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
>> >
>> > After switch to sort-merge based blocking shuffle, some of our users'
>> jobs
>> > can scale up to over 20000 parallelism, though need some JM and RM side
>> > optimization. I haven't ever tried to find where the upper bound is,
>> but I
>> > guess sever tens of thousand should be able to m
>> > <
>> >
>> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
>> > >eet
>> > the needs of most users.
>> >
>> > Best,
>> > Yingjie
>> >
>> > Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道:
>> >
>> > > Hi Yingjie,
>> > >
>> > > thanks for proposing the sort-merge based blocking shuffle. I like the
>> > > proposal and it does not seem to change the internals of Flink.
>> Instead
>> > it
>> > > is an extension of existing interfaces which makes it a
>> > > non-invasive addition.
>> > >
>> > > Do you have any numbers comparing the performance of the sort-merge
>> based
>> > > shuffle against the hash-based shuffle? To what parallelism can you
>> scale
>> > > up when using the sort-merge based shuffle?
>> > >
>> > > Cheers,
>> > > Till
>> > >
>> > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi devs,
>> > > >
>> > > > Currently, Flink adopts a hash-style blocking shuffle implementation
>> > > which
>> > > > writes data sent to different reducer tasks into separate files
>> > > > concurrently. Compared to sort-merge based approach writes those
>> data
>> > > > together into a single file and merges those small files into bigger
>> > > ones,
>> > > > hash-based approach has several weak points when it comes to running
>> > > large
>> > > > scale batch jobs:
>> > > >
>> > > >    1. *Stability*: For high parallelism (tens of thousands) batch
>> job,
>> > > >    current hash-based blocking shuffle implementation writes too
>> many
>> > > files
>> > > >    concurrently which gives high pressure to the file system, for
>> > > example,
>> > > >    maintenance of too many file metas, exhaustion of inodes or file
>> > > >    descriptors. All of these can be potential stability issues.
>> > > Sort-Merge
>> > > >    based blocking shuffle don’t have the problem because for one
>> result
>> > > >    partition, only one file is written at the same time.
>> > > >    2. *Performance*: Large amounts of small shuffle files and
>> random IO
>> > > can
>> > > >    influence shuffle performance a lot especially for hdd (for ssd,
>> > > > sequential
>> > > >    read is also important because of read ahead and cache). For
>> batch
>> > > jobs
>> > > >    processing massive data, small amount of data per subpartition is
>> > > common
>> > > >    because of high parallelism. Besides, data skew is another cause
>> of
>> > > > small
>> > > >    subpartition files. By merging data of all subpartitions
>> together in
>> > > one
>> > > >    file, more sequential read can be achieved.
>> > > >    3. *Resource*: For current hash-based implementation, each
>> > > subpartition
>> > > >    needs at least one buffer. For large scale batch shuffles, the
>> > memory
>> > > >    consumption can be huge. For example, we need at least 320M
>> network
>> > > > memory
>> > > >    per result partition if parallelism is set to 10000 and because
>> of
>> > the
>> > > > huge
>> > > >    network consumption, it is hard to config the network memory for
>> > large
>> > > >    scale batch job and  sometimes parallelism can not be increased
>> just
>> > > >    because of insufficient network memory  which leads to bad user
>> > > > experience.
>> > > >
>> > > > To improve Flink’s capability of running large scale batch jobs, we
>> > would
>> > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> > > > feedback is appreciated.
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>> > > >
>> > > > Best,
>> > > > Yingjie
>> > > >
>> > >
>> >
>>
>>

Reply via email to