Hi Zhijiang,

Thanks for your reply and suggestions.

1. For
taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we
decide to append all data produced by one result partition to one file, so
this option will be removed.

2. For
taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the
required buffer of the buffer pool will be min(numSubpartition + 1, this
config value), so there it does not increase the number of required buffers
but may reduce it when the parallelism is very high. So when user switch to
sort-merge implementation, there should be no insufficient network buffers
issue.

3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I
agree a bool value is easier to config for user, so we will replace this
with a bool switch. We can add this config option back is we have
performance concerns in the future.

Best,
Yingjie

Zhijiang <wangzhijiang...@aliyun.com.invalid> 于2020年10月19日周一 下午5:27写道:

> Thanks for launching the discussion and the respective FLIP, Yingjie!
>
> In general, I am +1 for this proposal since sort-merge ability has already
> been taken widely in other batch-based project, like MR, Spark, etc.
> And it indeed has some performance benefits in some scenarios as mentioned
> in FLIP.
>
> I only have some thoughts for the section of `Public Interfaces` since it
> cares about how the users understand and better use in practice.
>  As for the new introduced classes, the can be further reviewed in follow
> up PR since without existing interfaces refactoring ATM.
>
> 1.
> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition:
> the default value should be `1` I guess?  It is better to give a proper
> default value that most of users do not need to
>  care about it in practice.
>
> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition:
> how about making the default for the number of required buffers in
> LocalBufferPool as now for result partition?
>  Then it is transparent for users to not increase any memory resource no
> matter with either hash based or sort-merge based way. For the tuned
> setting , it is better to give some hints to guide
>  users how to adjust it for better performance based on some factors.
>
> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I
> guess it is not very easy or determined to give a proper value for the
> switch between hash based and sort-merge based.
>  And how much data a subpartition taking (broadcast) or not suitable for
> hash based is not completely decided by the number of parallelism somehow.
> And users might be confused how to tune
>  it in practice. I prefer to giving a simple boolean type option for easy
> use and the default value can be false in MVP. Then it will not bring any
> effects for users after upgrade to new version by default,
>  and sort-merge option can be enabled to try out if users willing in
> desired scenarios.
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> From:Till Rohrmann <trohrm...@apache.org>
> Send Time:2020年10月16日(星期五) 15:42
> To:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking
> Shuffle to Flink
>
> Thanks for sharing the preliminary numbers with us Yingjie. The numbers
> look quite impressive :-)
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <kevin.ying...@gmail.com>
> wrote:
>
> > Hi Till,
> >
> > Thanks for your reply and comments.
> >
> > You are right, the proposed sort-merge based shuffle is an extension of
> the
> > existing blocking shuffle and does not change any default behavior of
> > Flink.
> >
> > As for the performance, according to our previous experience, sort-merge
> > based implementation can reduce the shuffle time by 30% to even 90%
> > compared to hash-based implementation. My PoC implementation without any
> > further optimization can already reduce the shuffle time over 10% on SSD
> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
> >
> > After switch to sort-merge based blocking shuffle, some of our users'
> jobs
> > can scale up to over 20000 parallelism, though need some JM and RM side
> > optimization. I haven't ever tried to find where the upper bound is, but
> I
> > guess sever tens of thousand should be able to m
> > <
> >
> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
> > >eet
> > the needs of most users.
> >
> > Best,
> > Yingjie
> >
> > Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道:
> >
> > > Hi Yingjie,
> > >
> > > thanks for proposing the sort-merge based blocking shuffle. I like the
> > > proposal and it does not seem to change the internals of Flink. Instead
> > it
> > > is an extension of existing interfaces which makes it a
> > > non-invasive addition.
> > >
> > > Do you have any numbers comparing the performance of the sort-merge
> based
> > > shuffle against the hash-based shuffle? To what parallelism can you
> scale
> > > up when using the sort-merge based shuffle?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com>
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > Currently, Flink adopts a hash-style blocking shuffle implementation
> > > which
> > > > writes data sent to different reducer tasks into separate files
> > > > concurrently. Compared to sort-merge based approach writes those data
> > > > together into a single file and merges those small files into bigger
> > > ones,
> > > > hash-based approach has several weak points when it comes to running
> > > large
> > > > scale batch jobs:
> > > >
> > > >    1. *Stability*: For high parallelism (tens of thousands) batch
> job,
> > > >    current hash-based blocking shuffle implementation writes too many
> > > files
> > > >    concurrently which gives high pressure to the file system, for
> > > example,
> > > >    maintenance of too many file metas, exhaustion of inodes or file
> > > >    descriptors. All of these can be potential stability issues.
> > > Sort-Merge
> > > >    based blocking shuffle don’t have the problem because for one
> result
> > > >    partition, only one file is written at the same time.
> > > >    2. *Performance*: Large amounts of small shuffle files and random
> IO
> > > can
> > > >    influence shuffle performance a lot especially for hdd (for ssd,
> > > > sequential
> > > >    read is also important because of read ahead and cache). For batch
> > > jobs
> > > >    processing massive data, small amount of data per subpartition is
> > > common
> > > >    because of high parallelism. Besides, data skew is another cause
> of
> > > > small
> > > >    subpartition files. By merging data of all subpartitions together
> in
> > > one
> > > >    file, more sequential read can be achieved.
> > > >    3. *Resource*: For current hash-based implementation, each
> > > subpartition
> > > >    needs at least one buffer. For large scale batch shuffles, the
> > memory
> > > >    consumption can be huge. For example, we need at least 320M
> network
> > > > memory
> > > >    per result partition if parallelism is set to 10000 and because of
> > the
> > > > huge
> > > >    network consumption, it is hard to config the network memory for
> > large
> > > >    scale batch job and  sometimes parallelism can not be increased
> just
> > > >    because of insufficient network memory  which leads to bad user
> > > > experience.
> > > >
> > > > To improve Flink’s capability of running large scale batch jobs, we
> > would
> > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> > > > feedback is appreciated.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> > > >
> > > > Best,
> > > > Yingjie
> > > >
> > >
> >
>
>

Reply via email to