Hi Zhijiang, Thanks for your reply and suggestions.
1. For taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we decide to append all data produced by one result partition to one file, so this option will be removed. 2. For taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the required buffer of the buffer pool will be min(numSubpartition + 1, this config value), so there it does not increase the number of required buffers but may reduce it when the parallelism is very high. So when user switch to sort-merge implementation, there should be no insufficient network buffers issue. 3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I agree a bool value is easier to config for user, so we will replace this with a bool switch. We can add this config option back is we have performance concerns in the future. Best, Yingjie Zhijiang <wangzhijiang...@aliyun.com.invalid> 于2020年10月19日周一 下午5:27写道: > Thanks for launching the discussion and the respective FLIP, Yingjie! > > In general, I am +1 for this proposal since sort-merge ability has already > been taken widely in other batch-based project, like MR, Spark, etc. > And it indeed has some performance benefits in some scenarios as mentioned > in FLIP. > > I only have some thoughts for the section of `Public Interfaces` since it > cares about how the users understand and better use in practice. > As for the new introduced classes, the can be further reviewed in follow > up PR since without existing interfaces refactoring ATM. > > 1. > taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition: > the default value should be `1` I guess? It is better to give a proper > default value that most of users do not need to > care about it in practice. > > 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: > how about making the default for the number of required buffers in > LocalBufferPool as now for result partition? > Then it is transparent for users to not increase any memory resource no > matter with either hash based or sort-merge based way. For the tuned > setting , it is better to give some hints to guide > users how to adjust it for better performance based on some factors. > > 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I > guess it is not very easy or determined to give a proper value for the > switch between hash based and sort-merge based. > And how much data a subpartition taking (broadcast) or not suitable for > hash based is not completely decided by the number of parallelism somehow. > And users might be confused how to tune > it in practice. I prefer to giving a simple boolean type option for easy > use and the default value can be false in MVP. Then it will not bring any > effects for users after upgrade to new version by default, > and sort-merge option can be enabled to try out if users willing in > desired scenarios. > > Best, > Zhijiang > ------------------------------------------------------------------ > From:Till Rohrmann <trohrm...@apache.org> > Send Time:2020年10月16日(星期五) 15:42 > To:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking > Shuffle to Flink > > Thanks for sharing the preliminary numbers with us Yingjie. The numbers > look quite impressive :-) > > Cheers, > Till > > On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <kevin.ying...@gmail.com> > wrote: > > > Hi Till, > > > > Thanks for your reply and comments. > > > > You are right, the proposed sort-merge based shuffle is an extension of > the > > existing blocking shuffle and does not change any default behavior of > > Flink. > > > > As for the performance, according to our previous experience, sort-merge > > based implementation can reduce the shuffle time by 30% to even 90% > > compared to hash-based implementation. My PoC implementation without any > > further optimization can already reduce the shuffle time over 10% on SSD > > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. > > > > After switch to sort-merge based blocking shuffle, some of our users' > jobs > > can scale up to over 20000 parallelism, though need some JM and RM side > > optimization. I haven't ever tried to find where the upper bound is, but > I > > guess sever tens of thousand should be able to m > > < > > > http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W > > >eet > > the needs of most users. > > > > Best, > > Yingjie > > > > Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道: > > > > > Hi Yingjie, > > > > > > thanks for proposing the sort-merge based blocking shuffle. I like the > > > proposal and it does not seem to change the internals of Flink. Instead > > it > > > is an extension of existing interfaces which makes it a > > > non-invasive addition. > > > > > > Do you have any numbers comparing the performance of the sort-merge > based > > > shuffle against the hash-based shuffle? To what parallelism can you > scale > > > up when using the sort-merge based shuffle? > > > > > > Cheers, > > > Till > > > > > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com> > > > wrote: > > > > > > > Hi devs, > > > > > > > > Currently, Flink adopts a hash-style blocking shuffle implementation > > > which > > > > writes data sent to different reducer tasks into separate files > > > > concurrently. Compared to sort-merge based approach writes those data > > > > together into a single file and merges those small files into bigger > > > ones, > > > > hash-based approach has several weak points when it comes to running > > > large > > > > scale batch jobs: > > > > > > > > 1. *Stability*: For high parallelism (tens of thousands) batch > job, > > > > current hash-based blocking shuffle implementation writes too many > > > files > > > > concurrently which gives high pressure to the file system, for > > > example, > > > > maintenance of too many file metas, exhaustion of inodes or file > > > > descriptors. All of these can be potential stability issues. > > > Sort-Merge > > > > based blocking shuffle don’t have the problem because for one > result > > > > partition, only one file is written at the same time. > > > > 2. *Performance*: Large amounts of small shuffle files and random > IO > > > can > > > > influence shuffle performance a lot especially for hdd (for ssd, > > > > sequential > > > > read is also important because of read ahead and cache). For batch > > > jobs > > > > processing massive data, small amount of data per subpartition is > > > common > > > > because of high parallelism. Besides, data skew is another cause > of > > > > small > > > > subpartition files. By merging data of all subpartitions together > in > > > one > > > > file, more sequential read can be achieved. > > > > 3. *Resource*: For current hash-based implementation, each > > > subpartition > > > > needs at least one buffer. For large scale batch shuffles, the > > memory > > > > consumption can be huge. For example, we need at least 320M > network > > > > memory > > > > per result partition if parallelism is set to 10000 and because of > > the > > > > huge > > > > network consumption, it is hard to config the network memory for > > large > > > > scale batch job and sometimes parallelism can not be increased > just > > > > because of insufficient network memory which leads to bad user > > > > experience. > > > > > > > > To improve Flink’s capability of running large scale batch jobs, we > > would > > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > > > > feedback is appreciated. > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > > > > > > > Best, > > > > Yingjie > > > > > > > > > > >