[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-12-08 Thread zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991107#comment-16991107
 ] 

zhijiang commented on FLINK-14845:
--

[~pnowojski] Yes, exactly we would verify the effects via benchmark for 
blocking partition with compression as a follow up work. I created the 
respective tickets in FLINK-15070 and FLINK-15069 .

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-12-08 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990786#comment-16990786
 ] 

Piotr Nowojski commented on FLINK-14845:


Thanks [~kevin.cyj] and [~zjwang] for your efforts!

Can we follow this up with some network benchmark using the compression? 
Especially it would be interesting if it's possible to see the performance 
improvement for bounded partitions when using the compression.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-12-07 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990697#comment-16990697
 ] 

Yingjie Cao commented on FLINK-14845:
-

Fix via 66d4d7da2d8b717f420509d9785fad0880562f10 on master.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-12-01 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16985873#comment-16985873
 ] 

Yingjie Cao commented on FLINK-14845:
-

Based on the above discussions, I have given an implementation and opened a PR 
for this issue: [https://github.com/apache/flink/pull/10375]. Any feed back is 
appreciated.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>  Labels: pull-request-available
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-27 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16984097#comment-16984097
 ] 

Yingjie Cao commented on FLINK-14845:
-

> So I think it's fine to not make it pluggable in the first version. However 
> probably this should be changed and moved to plugin sooner or later.

Totally agree.

> In that case we would need to document this expected behaviour (previously 
>returned buffers should be recycled immediately before asking for new ones) 
>some where in the {{InputGate#getNext}} and {{InputChannel#getNextBuffer}}.

That's necessary and I will do that.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-27 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983552#comment-16983552
 ] 

Piotr Nowojski commented on FLINK-14845:


Regarding the org.lz4:lz4-java dependency. This one is very lightweight, I do 
not see any transitive dependencies, which is good. So the only potential 
issues might arise if user's code depends on the same library in a different 
version. This should be resolved by user-first class loader, so I think it's 
fine to not make it pluggable in the first version. However probably this 
should be changed and moved to plugin sooner or later.

{quote}We have to consider how to handle the condition when there is no free 
buffers in the buffer pool, and there is may deadlock problem if we choose to 
wait in a blocking way. {quote}
I guess you are right [~kevin.cyj]. We could safely accumulate and not recycle 
multiple buffers only when a reader would be polling the buffers from the 
{{InputGate}} in a non blocking way and wouldn't depend on more buffers to 
arrive before recycling them. It's hard for me to imagine a use case with such 
contract.

In that case we would need to document this expected behaviour (previously 
returned buffers should be recycled immediately before asking for new ones) 
some where in the {{InputGate#getNext}}  and {{InputChannel#getNextBuffer}}.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-27 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983338#comment-16983338
 ] 

Yingjie Cao commented on FLINK-14845:
-

> Would it be more complicate to request a buffer from the network pool?

We have to consider how to handle the condition when there is no free buffers 
in the buffer pool, and there is may deadlock problem if we choose to wait in a 
blocking way. In my opinion, we should guarantee the upper layer of network 
stack don't hold more than one buffer (the min number of exclusive buffer per 
channel) to avoid deadlock, let us consider an extreme situation, the upper 
layer of network stack always request buffer but never recycle them, then it is 
doom to lead to the exhaustion of network buffer thus result in deadlock 
problem, then how does the upper layer of network know how many buffer it can 
request, I think restricting the number to only one buffer is reasonable, and 
if the upper layer wants to cache data for some reason, then it should copy the 
data to a new place (maybe buffer from MemoryManager or other unpooled buffer) 
and recycle the network buffer to network stack.

> Also, could you list the dependencies that you would like to add to the 
> runtime for the default compression (if any)? 

Currently, only one dependency (org.lz4:lz4-java) is added. Do you think it 
acceptable if we don't make it pluggable in this version? If so, we only need 
to add one config option which is used to enable/disable the compression, after 
there is only one compress algorithm at the moment. We can make it pluggable in 
the future. I am asking this mainly because of the limited time. If we make it 
pluggable in this, we may need to decide the plugin interface (can be used by 
both table and runtime) which may take some time. Currently, I use the 
interface copied from table module directly in my POC implementation.

> I was mainly thinking if you want to expose to the user some compression 
>tweaking parameters (affecting compression speed/ratio). I would prefer to 
>avoid that, at least in the first version.

I totally agree with you, we should avoid to add too many config options in the 
first version. We may add some in the future if there are demands from users.

BTW, do you have any other concerns? [~pnowojski]

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-26 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982446#comment-16982446
 ] 

Piotr Nowojski commented on FLINK-14845:


For the output, writing back the compressed bytes in place of the uncompressed 
ones would be a neat trick, as long as it doesn't complicate the code too much.

{quote}For InputGate, currently, the previous Buffer is guaranteed to be 
recycled before getting next buffer, so we can decompress the compressed Buffer 
to the reusable intermediate buffer in decompressor{quote}
I'm not sure if that's a good idea to relay on such behaviour. I think there 
was not explicit contract that only one single buffer can be polled from an 
InputGate/InputChannel at a time, or is there? Would it be more complicate to 
request a buffer from the network pool?
{quote}support LZ4 compression algorithm and we can use that by default.{quote}
+1 for LZ4 as the default.
{quote}I think we should make it pluggable. The users can implement a factory 
for compressor and decompressor instance creation{quote}
That might be a follow up story. And if we want to make it pluggable, keep in 
mind that it should be using plugin class loader ({{PluginUtils}}, 
{{PluginManager}}). Also, could you list the dependencies that you would like 
to add to the runtime for the default compression (if any)? We might need to 
pluginize the default compressor as well.
{quote}At least two config options should be added, one is for compression 
enable/disable and another is for compression codec (factory).{quote}
I was mainly thinking if you want to expose to the user some compression 
tweaking parameters (affecting compression speed/ratio). I would prefer to 
avoid that, at least in the first version.
{quote}I think users of Flink can benefit from this feature, especially the 
batch users, so it better to make it available for 1.10 {quote}
This might be tight, but we can try to do it.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-26 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982271#comment-16982271
 ] 

Yingjie Cao commented on FLINK-14845:
-

> Where are you going to compress data into?

Each compressor instance contains a slice of unpooled intermediate buffer and 
at least two compression methods will be implemented, one compresses the given 
input Buffer to the intermediate buffer and warp the buffer as a Buffer 
instance and returns it to the caller, the intermediate buffer will be reused, 
that is, the caller should guarantee that the returned Buffer is ready to be 
recycled before compress next buffer, another compression method compresses the 
the given input buffer to the intermediate buffer, then copies the compressed 
data back to the input Buffer and return it.

The decompressor works similarly, that is, it can decompress the given Buffer 
to a reusable intermediate buffer and return or copy the decompressed data back 
to the input Buffer.

We could implement a new util method which takes two input Buffers and 
compress/decompress data from one to another if needed.

Compressor and decompressor instance is InputGate and ResultPartition level and 
shared between subpartitions, the memory overhead would be 32k/64k per 
InputGate or ResultPartition.

Currently, the BlockingSubpartition writers Buffer out and recycle it directly 
so we can choose to compress the Buffer to the usable intermediate buffer which 
can avoid one copy. For PipelinedSubpartition, the Buffer is not recycled 
directly, we can copy it back to the sliced Buffer right to the original 
position and slice a new Buffer. (Note if the data size grows after 
compression, the compression will be given up and the data will be kept 
uncompressed) For InputGate, currently, the previous Buffer is guaranteed to be 
recycled before getting next buffer, so we can decompress the compressed Buffer 
to the reusable intermediate buffer in decompressor.

> What compression algorithm would you like to use? 

A general purpose compression library which dependents on a Apache compatible 
third party library has already been implemented in table module. I think that 
compression library can be reused by both runtime and table. We only need to 
copy the compression util from table to runtime and make some minor changes 
accordingly.

> Will the compression be configurable?

Currently, the compression util in table only support LZ4 compression algorithm 
and we can use that by default. I think we should make it pluggable. The users 
can implement a factory for compressor and decompressor instance creation. At 
least two config options should be added, one is for compression enable/disable 
and another is for compression codec (factory).

> What's the timeline of this feature?

I think users of Flink can benefit from this feature, especially the batch 
users, so it better to make it available for 1.10 if time allows (not a problem 
for me). Besides, there is another consideration, that is, we want the user to 
be able to reproduce the tpc-ds benchmark results using version 1.10.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-23 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16980729#comment-16980729
 ] 

Piotr Nowojski commented on FLINK-14845:


Maybe before going further I have a couple of questions. 

Where are you going to compress data into? To some unpooled memory? Would you 
keep a reference count of the buffer that holding uncompressed bytes as long as 
we are handling the compressed memory? If not, compression could double the 
amount of buffered in-flight data.

What compression algorithm would you like to use? Some library? Will we need to 
add a new dependency? Will its license be compatible with Apache?

Will the compression be configurable? Can user select multiple compression 
algorithms and compression parameters? Do we want to let user provide his own 
compression, in other words, make the compressor pluggable?

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-21 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979871#comment-16979871
 ] 

Yingjie Cao commented on FLINK-14845:
-

[~pnowojski] I totally agree with you. Then I will work on the issue and will 
update if there is any problem or progress.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-21 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979249#comment-16979249
 ] 

Piotr Nowojski commented on FLINK-14845:


[~kevin.cyj], I think I would prefer the 3rd option, but I think you are right, 
potential implementation cost of this option might not be worth it for now. 

I guess the difference between 2. and 4. in terms of an implementation is very 
small? We could start with 4. and re-evaluate 3., if we ever get such request 
from a user.

Side note, [~AHeise] made me aware that we might find this compression useful 
also for spilling the buffers for unaligned checkpoints. 

Also just for the record. I was thinking about another solution, to 
compress/decompress the data in Netty handlers. For streaming that would solve 
all of the problems with network flushes and small records/buffers (Netty would 
be compressing multiplexed channels), but again that wouldn't be very helpful 
for the {{BoundedBlockingSubpartition}}. Again, for now we can stick with the 
common compression code path shared between streaming/batch and if this ever 
becomes an issue, we can re-visit.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979002#comment-16979002
 ] 

Yingjie Cao commented on FLINK-14845:
-

[~pnowojski] , thanks for proposing these options.
 # According to [~lzljs3620320]'s comment, there seems much work to do if we 
add compression to table/sql layer.
 # Currently, my POC implementation adopts this option, though as you have 
pointed out, this could make the system less complete.
 # As far as I know, most of the compression algorithms act on a window of 
data, for example 32k or 64k. If we want to implement a continuous compression 
algorithm which takes in records and gives out compressed record immediately 
(dose not produce the compressed record immediately may influence latency and 
invalid the flush mechanism), then we must consider how to update the meta (the 
'dict' for decompression) and let the down stream know how to decompress. As 
you have pointed out, that is complicated.
 # If we let the compression happen insideBufferConsumer#build, then for 
Pipelined mode, netty threads do the compression and for Blocking mode, task 
thread do the compression. If we don't which thread do the compression (not a 
problem for me), it's a good choice and can work for both Pipelined and 
Blocking mode.

>From my perspective of view, I prefer the forth one. What do you think? 
>[~pnowojski]

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978343#comment-16978343
 ] 

Jingsong Lee commented on FLINK-14845:
--

> doesn't Blink work on batches of records? Couldn't the (columnar?) 
> compression be performed there? Either via some mapping operator or on record 
> serialisation/deserialisation layer?

Current Blink work on BaseRow which means work on per record, In table/sql 
layer, we can not select the sub result partition, so these row->ColumnarBatch 
and compression things should do after sub partition selected. So we must 
provide some special batch serialization and related compression interface to 
support columnar batch and its compression for table/sql layer.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978325#comment-16978325
 ] 

Piotr Nowojski commented on FLINK-14845:


Re, [~lzljs3620320]: doesn't Blink work on batches of records? Couldn't the 
(columnar?) compression be performed there? Either via some mapping operator or 
on record serialisation/deserialisation layer?

 
[~kevin.cyj], thanks for pointing this out. I missed that 
{{ResultSubpartitionView#getNextBuffer}} happens too late for 
{{BoundedBlockingSubpartition}} - after the data have already been written to 
disks. In that case I can see three options:
# let the compression happen somewhere in 
{{BoundedBlockingSubpartition#writeAndCloseBufferConsumer}}
# use some streaming/continuous compression algorithm, that would allow for the 
compressed stream of bytes to be chopped off (and decompressed) at any point of 
time
# let the compression happen inside {{BufferConsumer#build}}?

1. Would work just for {{BoundedBlockingSubpartition}}
2. Maybe a bit more complicated, but works for both pipelined and bounded 
sub-partitions and also solves the problem of frequent flushing with few 
records handled in between?
3. ?


> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978297#comment-16978297
 ] 

Chesnay Schepler commented on FLINK-14845:
--

[~kevin.cyj] Cluster partitions may also be read by another cluster; as such 
the codec information must be contained somewhere in the 
ResultPartitionDeploymentDescriptor.

Alternatively, to simplify things for the time being, we could also say that 
cluster partitions must not be compressed.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978288#comment-16978288
 ] 

Yingjie Cao commented on FLINK-14845:
-

[~chesnay] A choice maybe make the compression codec cluster wide and can not 
be configured per job. Another choice is storing the information in meta and 
take it with the deployment info when deploying a task. What do you think?

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978284#comment-16978284
 ] 

Yingjie Cao commented on FLINK-14845:
-

[~pnowojski]  You are right, taking both Blocking and Pipelined scenario into 
consideration would make the system more complete. Do you think only 
compressing the whole buffer using the task thread and leaving the the flushing 
sliced buffer uncompressed is acceptable for Pipelined mode?

Currently, the blocking subpartition does not interact with netty thread and 
the task thread directly write Buffers out, If we choose to compress with netty 
thread, we have to consider the interaction between netty thread and task 
thread.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Chesnay Schepler (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978268#comment-16978268
 ] 

Chesnay Schepler commented on FLINK-14845:
--

How would this work with cluster partitions? (FLIP-67; partitions being kept 
around after a job finishes, potentially being read by other jobs) How would 
the other jobs know what compression was used?

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978257#comment-16978257
 ] 

Jingsong Lee commented on FLINK-14845:
--

[~pnowojski] put forward a good thought for the future. I have tested it, 
columnar compression can achieve 10x compression, but lz4 can just achieve 2x 
compression. But it also brings more conversion overhead. And it requires the 
network layer to make great changes. Maybe it is an another story that it need 
do compression on structure records but normal compression just need base on 
binary data. I'm very glad to have some consideration.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Assignee: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978243#comment-16978243
 ] 

Piotr Nowojski commented on FLINK-14845:


+1 for the future. Could be useful also for {{PipelinedSubpartition}} in some 
rare scenario of network bound clusters.

Compressing the data in the Task thread with the presence of an 
{{OutputFlusher}} would be very difficult to do, maybe impossible without 
adding extra synchronisation. It's because currently buffer 
({{BufferConsumer}}) can be handed to netty for consumption and more records 
can be appended from the task thread. To deal with that, we would have to 
compress data only per record, or add extra synchronisation between Netty and 
Task threads.

I think maybe a better idea would be to do the compression on the boundary of 
{{ResultSubpartition}} and netty (expand 
{{CreditBasedSequenceNumberingViewReader}}? create a new alternative 
{{NetworkSequenceViewReader}}?), and perform the compression in the Netty 
thread. I don't think this should be an issue, as this would be non blocking 
operation that's relatively fast (same order of magnitude as the copying the 
memory).

To be the devil advocate here. Wouldn't Blink benefit more, from a more 
specific, columnar compression/decompression compared to a generic 
{{Buffer}}/{{MemorySegment}} based? Compressing each column independently 
should give with much better compression ratios.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978241#comment-16978241
 ] 

Yingjie Cao commented on FLINK-14845:
-

Doing compression and decompression with task threads may be a better choice, 
after all, the netty threads are shared between jobs and should be dedicated to 
message communication. If we don't touch the Pipeline part and only introduce 
compression to BoundedBlockingSubpartition, can we ignore the flushing handover 
logic for the moment? After all, no flushing happens for blocking subpartition. 
By the way, if we introduce compression to Pipeline mode later, I prefer only 
to only compress the whole buffer (task thread can do the work) and leave the 
flushing sliced buffer uncompressed, after all, compressing too small data may 
not help at all.

[~sewen] What do you think?

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978203#comment-16978203
 ] 

Stephan Ewen commented on FLINK-14845:
--

For combining compression and low latency, we need to see which thread should 
do the compression, how to not make it interfere with the flushing handover 
logic (which does minimal concurrency), and how to avoid compression for local 
channels.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978184#comment-16978184
 ] 

Yingjie Cao commented on FLINK-14845:
-

[~lzljs3620320] For small data, for example, dozens of byte, the case should be 
common, for 32K Buffer, I guess that should be rare. I add the flag to be on 
the safe side. Besides, I have also considered about whether we can reuse the 
compression code for pipeline mode, the low-latency  flushing mechanism may 
lead to send of buffer much smaller than 32K, which may make the case in 8 more 
common.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-20 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978175#comment-16978175
 ] 

Jingsong Lee commented on FLINK-14845:
--

Thanks [~kevin.cyj]

A little confuse about point 8, Are there many cases where the compression size 
is larger than the original data?  If not, Can we consider removing the 
compression flag from the buffer? My first thought is that this should be a 
global flag instead of per buffer. (But per buffer maybe not easy to make 
mistakes)

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-19 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978153#comment-16978153
 ] 

Yingjie Cao commented on FLINK-14845:
-

To verify how many improvements we could gain from compression. I give a POC 
implementation which dose not make much abstraction and modifies the existing 
InputGate and ResultPartition directly (the final result shows that the change 
is modest). The main changes are summarized as follows.
 # Copied the block compression utils from flink-table (should be shared by 
table and runtime).
 # Implement a BufferCompressor and a BufferDecompressor util which take in 
Buffer and return Buffer based on the compression until from 1.
 # Add a flag to Buffer to identify if it is compressed.
 # Each ResultPartition contains a BufferCompressor and each SingleInputGate 
contains a BufferDecompressor.
 # In BoundedBlockingSubpartition, when writing a buffer out, the buffer will 
be compressed (call BufferCompressor#compress) if the compression is enabled. 
When written to disk, the compression flag is also written out.
 # In SingleInputGate if a received Buffer is compressed (identified by flag 
introduced by 3), then it will be decompressed (call 
BufferDecompressor#decompress).
 # Add a compression flag to BufferResponse to let the Buffer receiver known if 
the Buffer is compressed (This additional flag can be avoid by adding a new 
message type like CompressedBufferResponse which directly extends 
BufferResponse and implements a isCompressed interface which always returns 
true).
 # There is a bad case in which the data size become even bigger after 
compression, the BufferCompressor will handle that and the original 
uncompressed buffer will be returned, which means that the receiver will 
receive a mixture of compressed and uncompressed Buffer.

[~sewen] [~ykt836] [~lzljs3620320] What do you think?

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-19 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978019#comment-16978019
 ] 

Jingsong Lee commented on FLINK-14845:
--

[~sewen] Sorry for misunderstanding. What I want to say is forward instead of 
pipeline.

As [~ykt836] said, now, we set true to 
`StreamGraph.setBlockingConnectionsBetweenChains`, that will set all result 
partition type to `BLOCKING`.

We also set false to 
`StreamGraph.setAllVerticesInSameSlotSharingGroupByDefault`. Because operators 
have separate memory usage. 

But your reminder may be a way. We can let source and its forward join to run 
co-located in the same slot. In this special case, we can set pipeline shuffle 
type and set them into one slot share group.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-19 Thread Kurt Young (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977982#comment-16977982
 ] 

Kurt Young commented on FLINK-14845:


[~sewen] Currently we set all shuffle type to `BLOCKING` in blink planner to 
avoid schedule deadlock. And even if we set some of the shuffle type to 
`PIPELINE`, e.g. the forward side of broadcast hash join, we still need to set 
the colocation group for this operator to make the input and hash join 
scheduled into one slot. We didn't try colocation group before and have no idea 
how it performs. Might be interesting to give a shot in a near future. 

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-19 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977693#comment-16977693
 ] 

Stephan Ewen commented on FLINK-14845:
--

[~lzljs3620320] Quick question for clarification: In the example you described, 
the large table could (should) be connected to the join task by a pipelined 
channel that does not spill (in memory, connecting source and join who run 
co-located in the same slot).
I guess that should also be possible for the Blink query engine in 1.10 I 
guess, with FLIP-53 ?

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-19 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977441#comment-16977441
 ] 

Stephan Ewen commented on FLINK-14845:
--

Kurt and me had a quick chat about this.

On the receiver side, having a "decompressing input gate" seems pretty 
straightforward.
On the sender side, it is slightly more tricky. We have to check how this 
interacts with the "task to netty handover" for low-latency sender-side 
flushing, though. That code is there even if no low latency flushing happens.


> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.

2019-11-18 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977096#comment-16977096
 ] 

Jingsong Lee commented on FLINK-14845:
--

Big +1 for this feature.

There will be a lot of shuffles in the current batch scene, that is, there will 
be a lot of spilling IOs.

For example, in the simplest join scenario, a large table and a small table 
join can filter out a lot of data. The data scale of a small table is very 
small. We can use broadcast join, but because there is no chain for 
TwoInputOperator at present, the large table still needs to go through the 
pipeline to cause spilling data to disk.

Now that there is no TwoInputOperatorChain, shuffle is the performance killer 
of the whole job. Disk IO is the performance bottleneck in many scenario.

> Introduce data compression to blocking shuffle.
> ---
>
> Key: FLINK-14845
> URL: https://issues.apache.org/jira/browse/FLINK-14845
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Reporter: Yingjie Cao
>Priority: Major
>
> Currently, blocking shuffle writer writes raw output data to disk without 
> compression. For IO bounded scenario, this can be optimized by compressing 
> the output data. It is better to introduce a compression mechanism and offer 
> users a config option to let the user decide whether to compress the shuffle 
> data. Actually, we hava implemented compression in our inner Flink version 
> and  here are some key points:
> 1. Where to compress/decompress?
> Compressing at upstream and decompressing at downstream.
> 2. Which thread do compress/decompress?
> Task threads do compress/decompress.
> 3. Data compression granularity.
> Per buffer.
> 4. How to handle that when data size become even bigger after compression?
> Give up compression in this case and introduce an extra flag to identify if 
> the data was compressed, that is, the output may be a mixture of compressed 
> and uncompressed data.
>  
> We'd like to introduce blocking shuffle data compression to Flink if there 
> are interests.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)