Hi Till,

Thanks for your reply and comments.

You are right, the proposed sort-merge based shuffle is an extension of the
existing blocking shuffle and does not change any default behavior of Flink.

As for the performance, according to our previous experience, sort-merge
based implementation can reduce the shuffle time by 30% to even 90%
compared to hash-based implementation. My PoC implementation without any
further optimization can already reduce the shuffle time over 10% on SSD
and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.

After switch to sort-merge based blocking shuffle, some of our users' jobs
can scale up to over 20000 parallelism, though need some JM and RM side
optimization. I haven't ever tried to find where the upper bound is, but I
guess sever tens of thousand should be able to m
<http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W>eet
the needs of most users.

Best,
Yingjie

Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道:

> Hi Yingjie,
>
> thanks for proposing the sort-merge based blocking shuffle. I like the
> proposal and it does not seem to change the internals of Flink. Instead it
> is an extension of existing interfaces which makes it a
> non-invasive addition.
>
> Do you have any numbers comparing the performance of the sort-merge based
> shuffle against the hash-based shuffle? To what parallelism can you scale
> up when using the sort-merge based shuffle?
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com>
> wrote:
>
> > Hi devs,
> >
> > Currently, Flink adopts a hash-style blocking shuffle implementation
> which
> > writes data sent to different reducer tasks into separate files
> > concurrently. Compared to sort-merge based approach writes those data
> > together into a single file and merges those small files into bigger
> ones,
> > hash-based approach has several weak points when it comes to running
> large
> > scale batch jobs:
> >
> >    1. *Stability*: For high parallelism (tens of thousands) batch job,
> >    current hash-based blocking shuffle implementation writes too many
> files
> >    concurrently which gives high pressure to the file system, for
> example,
> >    maintenance of too many file metas, exhaustion of inodes or file
> >    descriptors. All of these can be potential stability issues.
> Sort-Merge
> >    based blocking shuffle don’t have the problem because for one result
> >    partition, only one file is written at the same time.
> >    2. *Performance*: Large amounts of small shuffle files and random IO
> can
> >    influence shuffle performance a lot especially for hdd (for ssd,
> > sequential
> >    read is also important because of read ahead and cache). For batch
> jobs
> >    processing massive data, small amount of data per subpartition is
> common
> >    because of high parallelism. Besides, data skew is another cause of
> > small
> >    subpartition files. By merging data of all subpartitions together in
> one
> >    file, more sequential read can be achieved.
> >    3. *Resource*: For current hash-based implementation, each
> subpartition
> >    needs at least one buffer. For large scale batch shuffles, the memory
> >    consumption can be huge. For example, we need at least 320M network
> > memory
> >    per result partition if parallelism is set to 10000 and because of the
> > huge
> >    network consumption, it is hard to config the network memory for large
> >    scale batch job and  sometimes parallelism can not be increased just
> >    because of insufficient network memory  which leads to bad user
> > experience.
> >
> > To improve Flink’s capability of running large scale batch jobs, we would
> > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> > feedback is appreciated.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >
> > Best,
> > Yingjie
> >
>

Reply via email to