Thanks Yingjie for the great effort!

This is really helpful to Flink Batch users!

Best,
Jingsong

On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <kevin.ying...@gmail.com> wrote:

> Hi devs & users,
>
> The FLIP-148[1] has been released with Flink 1.13 and the final
> implementation has some differences compared with the initial proposal in
> the FLIP document. To avoid potential misunderstandings, I have updated the
> FLIP document[1] accordingly and I also drafted another document[2] which
> contains more implementation details.  FYI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> [2]
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
>
> Best,
> Yingjie
>
> Yingjie Cao <kevin.ying...@gmail.com> 于2020年10月15日周四 上午11:02写道:
>
>> Hi devs,
>>
>> Currently, Flink adopts a hash-style blocking shuffle implementation
>> which writes data sent to different reducer tasks into separate files
>> concurrently. Compared to sort-merge based approach writes those data
>> together into a single file and merges those small files into bigger ones,
>> hash-based approach has several weak points when it comes to running large
>> scale batch jobs:
>>
>>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>>    current hash-based blocking shuffle implementation writes too many files
>>    concurrently which gives high pressure to the file system, for example,
>>    maintenance of too many file metas, exhaustion of inodes or file
>>    descriptors. All of these can be potential stability issues. Sort-Merge
>>    based blocking shuffle don’t have the problem because for one result
>>    partition, only one file is written at the same time.
>>    2. *Performance*: Large amounts of small shuffle files and random IO
>>    can influence shuffle performance a lot especially for hdd (for ssd,
>>    sequential read is also important because of read ahead and cache). For
>>    batch jobs processing massive data, small amount of data per subpartition
>>    is common because of high parallelism. Besides, data skew is another cause
>>    of small subpartition files. By merging data of all subpartitions together
>>    in one file, more sequential read can be achieved.
>>    3. *Resource*: For current hash-based implementation, each
>>    subpartition needs at least one buffer. For large scale batch shuffles, 
>> the
>>    memory consumption can be huge. For example, we need at least 320M network
>>    memory per result partition if parallelism is set to 10000 and because of
>>    the huge network consumption, it is hard to config the network memory for
>>    large scale batch job and  sometimes parallelism can not be increased just
>>    because of insufficient network memory  which leads to bad user 
>> experience.
>>
>> To improve Flink’s capability of running large scale batch jobs, we would
>> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
>> feedback is appreciated.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>>
>> Best,
>> Yingjie
>>
>

-- 
Best, Jingsong Lee

Reply via email to