[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16982271#comment-16982271
 ] 

Yingjie Cao commented on FLINK-14845:
-------------------------------------

> Where are you going to compress data into?

Each compressor instance contains a slice of unpooled intermediate buffer and 
at least two compression methods will be implemented, one compresses the given 
input Buffer to the intermediate buffer and warp the buffer as a Buffer 
instance and returns it to the caller, the intermediate buffer will be reused, 
that is, the caller should guarantee that the returned Buffer is ready to be 
recycled before compress next buffer, another compression method compresses the 
the given input buffer to the intermediate buffer, then copies the compressed 
data back to the input Buffer and return it.

The decompressor works similarly, that is, it can decompress the given Buffer 
to a reusable intermediate buffer and return or copy the decompressed data back 
to the input Buffer.

We could implement a new util method which takes two input Buffers and 
compress/decompress data from one to another if needed.

Compressor and decompressor instance is InputGate and ResultPartition level and 
shared between subpartitions, the memory overhead would be 32k/64k per 
InputGate or ResultPartition.

Currently, the BlockingSubpartition writers Buffer out and recycle it directly 
so we can choose to compress the Buffer to the usable intermediate buffer which 
can avoid one copy. For PipelinedSubpartition, the Buffer is not recycled 
directly, we can copy it back to the sliced Buffer right to the original 
position and slice a new Buffer. (Note if the data size grows after 
compression, the compression will be given up and the data will be kept 
uncompressed) For InputGate, currently, the previous Buffer is guaranteed to be 
recycled before getting next buffer, so we can decompress the compressed Buffer 
to the reusable intermediate buffer in decompressor.

> What compression algorithm would you like to use? 

A general purpose compression library which dependents on a Apache compatible 
third party library has already been implemented in table module. I think that 
compression library can be reused by both runtime and table. We only need to 
copy the compression util from table to runtime and make some minor changes 
accordingly.

> Will the compression be configurable?

Currently, the compression util in table only support LZ4 compression algorithm 
and we can use that by default. I think we should make it pluggable. The users 
can implement a factory for compressor and decompressor instance creation. At 
least two config options should be added, one is for compression enable/disable 
and another is for compression codec (factory).

> What's the timeline of this feature?

I think users of Flink can benefit from this feature, especially the batch 
users, so it better to make it available for 1.10 if time allows (not a problem 
for me). Besides, there is another consideration, that is, we want the user to 
be able to reproduce the tpc-ds benchmark results using version 1.10.

> Introduce data compression to blocking shuffle.
> -----------------------------------------------
>
>                 Key: FLINK-14845
>                 URL: https://issues.apache.org/jira/browse/FLINK-14845
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Network
>            Reporter: Yingjie Cao
>            Assignee: Yingjie Cao
>            Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to