Hi devs, This discussion thread has been opened for over a week. If there is no other concerns, I'd like to open a voting thread soon.
Best, Yingjie Yingjie Cao <kevin.ying...@gmail.com> 于2020年10月23日周五 上午11:56写道: > Hi Zhijiang, > > Thanks for your reply and suggestions. > > 1. For > taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we > decide to append all data produced by one result partition to one file, so > this option will be removed. > > 2. For > taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the > required buffer of the buffer pool will be min(numSubpartition + 1, this > config value), so there it does not increase the number of required buffers > but may reduce it when the parallelism is very high. So when user switch to > sort-merge implementation, there should be no insufficient network buffers > issue. > > 3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I > agree a bool value is easier to config for user, so we will replace this > with a bool switch. We can add this config option back is we have > performance concerns in the future. > > Best, > Yingjie > > Zhijiang <wangzhijiang...@aliyun.com.invalid> 于2020年10月19日周一 下午5:27写道: > >> Thanks for launching the discussion and the respective FLIP, Yingjie! >> >> In general, I am +1 for this proposal since sort-merge ability has >> already been taken widely in other batch-based project, like MR, Spark, etc. >> And it indeed has some performance benefits in some scenarios as >> mentioned in FLIP. >> >> I only have some thoughts for the section of `Public Interfaces` since it >> cares about how the users understand and better use in practice. >> As for the new introduced classes, the can be further reviewed in follow >> up PR since without existing interfaces refactoring ATM. >> >> 1. >> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition: >> the default value should be `1` I guess? It is better to give a proper >> default value that most of users do not need to >> care about it in practice. >> >> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: >> how about making the default for the number of required buffers in >> LocalBufferPool as now for result partition? >> Then it is transparent for users to not increase any memory resource no >> matter with either hash based or sort-merge based way. For the tuned >> setting , it is better to give some hints to guide >> users how to adjust it for better performance based on some factors. >> >> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I >> guess it is not very easy or determined to give a proper value for the >> switch between hash based and sort-merge based. >> And how much data a subpartition taking (broadcast) or not suitable for >> hash based is not completely decided by the number of parallelism somehow. >> And users might be confused how to tune >> it in practice. I prefer to giving a simple boolean type option for easy >> use and the default value can be false in MVP. Then it will not bring any >> effects for users after upgrade to new version by default, >> and sort-merge option can be enabled to try out if users willing in >> desired scenarios. >> >> Best, >> Zhijiang >> ------------------------------------------------------------------ >> From:Till Rohrmann <trohrm...@apache.org> >> Send Time:2020年10月16日(星期五) 15:42 >> To:dev <dev@flink.apache.org> >> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking >> Shuffle to Flink >> >> Thanks for sharing the preliminary numbers with us Yingjie. The numbers >> look quite impressive :-) >> >> Cheers, >> Till >> >> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <kevin.ying...@gmail.com> >> wrote: >> >> > Hi Till, >> > >> > Thanks for your reply and comments. >> > >> > You are right, the proposed sort-merge based shuffle is an extension of >> the >> > existing blocking shuffle and does not change any default behavior of >> > Flink. >> > >> > As for the performance, according to our previous experience, sort-merge >> > based implementation can reduce the shuffle time by 30% to even 90% >> > compared to hash-based implementation. My PoC implementation without any >> > further optimization can already reduce the shuffle time over 10% on SSD >> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. >> > >> > After switch to sort-merge based blocking shuffle, some of our users' >> jobs >> > can scale up to over 20000 parallelism, though need some JM and RM side >> > optimization. I haven't ever tried to find where the upper bound is, >> but I >> > guess sever tens of thousand should be able to m >> > < >> > >> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W >> > >eet >> > the needs of most users. >> > >> > Best, >> > Yingjie >> > >> > Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道: >> > >> > > Hi Yingjie, >> > > >> > > thanks for proposing the sort-merge based blocking shuffle. I like the >> > > proposal and it does not seem to change the internals of Flink. >> Instead >> > it >> > > is an extension of existing interfaces which makes it a >> > > non-invasive addition. >> > > >> > > Do you have any numbers comparing the performance of the sort-merge >> based >> > > shuffle against the hash-based shuffle? To what parallelism can you >> scale >> > > up when using the sort-merge based shuffle? >> > > >> > > Cheers, >> > > Till >> > > >> > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com> >> > > wrote: >> > > >> > > > Hi devs, >> > > > >> > > > Currently, Flink adopts a hash-style blocking shuffle implementation >> > > which >> > > > writes data sent to different reducer tasks into separate files >> > > > concurrently. Compared to sort-merge based approach writes those >> data >> > > > together into a single file and merges those small files into bigger >> > > ones, >> > > > hash-based approach has several weak points when it comes to running >> > > large >> > > > scale batch jobs: >> > > > >> > > > 1. *Stability*: For high parallelism (tens of thousands) batch >> job, >> > > > current hash-based blocking shuffle implementation writes too >> many >> > > files >> > > > concurrently which gives high pressure to the file system, for >> > > example, >> > > > maintenance of too many file metas, exhaustion of inodes or file >> > > > descriptors. All of these can be potential stability issues. >> > > Sort-Merge >> > > > based blocking shuffle don’t have the problem because for one >> result >> > > > partition, only one file is written at the same time. >> > > > 2. *Performance*: Large amounts of small shuffle files and >> random IO >> > > can >> > > > influence shuffle performance a lot especially for hdd (for ssd, >> > > > sequential >> > > > read is also important because of read ahead and cache). For >> batch >> > > jobs >> > > > processing massive data, small amount of data per subpartition is >> > > common >> > > > because of high parallelism. Besides, data skew is another cause >> of >> > > > small >> > > > subpartition files. By merging data of all subpartitions >> together in >> > > one >> > > > file, more sequential read can be achieved. >> > > > 3. *Resource*: For current hash-based implementation, each >> > > subpartition >> > > > needs at least one buffer. For large scale batch shuffles, the >> > memory >> > > > consumption can be huge. For example, we need at least 320M >> network >> > > > memory >> > > > per result partition if parallelism is set to 10000 and because >> of >> > the >> > > > huge >> > > > network consumption, it is hard to config the network memory for >> > large >> > > > scale batch job and sometimes parallelism can not be increased >> just >> > > > because of insufficient network memory which leads to bad user >> > > > experience. >> > > > >> > > > To improve Flink’s capability of running large scale batch jobs, we >> > would >> > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any >> > > > feedback is appreciated. >> > > > >> > > > [1] >> > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink >> > > > >> > > > Best, >> > > > Yingjie >> > > > >> > > >> > >> >>