[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16982271#comment-16982271 ]
Yingjie Cao commented on FLINK-14845: ------------------------------------- > Where are you going to compress data into? Each compressor instance contains a slice of unpooled intermediate buffer and at least two compression methods will be implemented, one compresses the given input Buffer to the intermediate buffer and warp the buffer as a Buffer instance and returns it to the caller, the intermediate buffer will be reused, that is, the caller should guarantee that the returned Buffer is ready to be recycled before compress next buffer, another compression method compresses the the given input buffer to the intermediate buffer, then copies the compressed data back to the input Buffer and return it. The decompressor works similarly, that is, it can decompress the given Buffer to a reusable intermediate buffer and return or copy the decompressed data back to the input Buffer. We could implement a new util method which takes two input Buffers and compress/decompress data from one to another if needed. Compressor and decompressor instance is InputGate and ResultPartition level and shared between subpartitions, the memory overhead would be 32k/64k per InputGate or ResultPartition. Currently, the BlockingSubpartition writers Buffer out and recycle it directly so we can choose to compress the Buffer to the usable intermediate buffer which can avoid one copy. For PipelinedSubpartition, the Buffer is not recycled directly, we can copy it back to the sliced Buffer right to the original position and slice a new Buffer. (Note if the data size grows after compression, the compression will be given up and the data will be kept uncompressed) For InputGate, currently, the previous Buffer is guaranteed to be recycled before getting next buffer, so we can decompress the compressed Buffer to the reusable intermediate buffer in decompressor. > What compression algorithm would you like to use? A general purpose compression library which dependents on a Apache compatible third party library has already been implemented in table module. I think that compression library can be reused by both runtime and table. We only need to copy the compression util from table to runtime and make some minor changes accordingly. > Will the compression be configurable? Currently, the compression util in table only support LZ4 compression algorithm and we can use that by default. I think we should make it pluggable. The users can implement a factory for compressor and decompressor instance creation. At least two config options should be added, one is for compression enable/disable and another is for compression codec (factory). > What's the timeline of this feature? I think users of Flink can benefit from this feature, especially the batch users, so it better to make it available for 1.10 if time allows (not a problem for me). Besides, there is another consideration, that is, we want the user to be able to reproduce the tpc-ds benchmark results using version 1.10. > Introduce data compression to blocking shuffle. > ----------------------------------------------- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network > Reporter: Yingjie Cao > Assignee: Yingjie Cao > Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)