[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991107#comment-16991107 ] zhijiang commented on FLINK-14845: -- [~pnowojski] Yes, exactly we would verify the effects via benchmark for blocking partition with compression as a follow up work. I created the respective tickets in FLINK-15070 and FLINK-15069 . > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990786#comment-16990786 ] Piotr Nowojski commented on FLINK-14845: Thanks [~kevin.cyj] and [~zjwang] for your efforts! Can we follow this up with some network benchmark using the compression? Especially it would be interesting if it's possible to see the performance improvement for bounded partitions when using the compression. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990697#comment-16990697 ] Yingjie Cao commented on FLINK-14845: - Fix via 66d4d7da2d8b717f420509d9785fad0880562f10 on master. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16985873#comment-16985873 ] Yingjie Cao commented on FLINK-14845: - Based on the above discussions, I have given an implementation and opened a PR for this issue: [https://github.com/apache/flink/pull/10375]. Any feed back is appreciated. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16984097#comment-16984097 ] Yingjie Cao commented on FLINK-14845: - > So I think it's fine to not make it pluggable in the first version. However > probably this should be changed and moved to plugin sooner or later. Totally agree. > In that case we would need to document this expected behaviour (previously >returned buffers should be recycled immediately before asking for new ones) >some where in the {{InputGate#getNext}} and {{InputChannel#getNextBuffer}}. That's necessary and I will do that. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983552#comment-16983552 ] Piotr Nowojski commented on FLINK-14845: Regarding the org.lz4:lz4-java dependency. This one is very lightweight, I do not see any transitive dependencies, which is good. So the only potential issues might arise if user's code depends on the same library in a different version. This should be resolved by user-first class loader, so I think it's fine to not make it pluggable in the first version. However probably this should be changed and moved to plugin sooner or later. {quote}We have to consider how to handle the condition when there is no free buffers in the buffer pool, and there is may deadlock problem if we choose to wait in a blocking way. {quote} I guess you are right [~kevin.cyj]. We could safely accumulate and not recycle multiple buffers only when a reader would be polling the buffers from the {{InputGate}} in a non blocking way and wouldn't depend on more buffers to arrive before recycling them. It's hard for me to imagine a use case with such contract. In that case we would need to document this expected behaviour (previously returned buffers should be recycled immediately before asking for new ones) some where in the {{InputGate#getNext}} and {{InputChannel#getNextBuffer}}. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983338#comment-16983338 ] Yingjie Cao commented on FLINK-14845: - > Would it be more complicate to request a buffer from the network pool? We have to consider how to handle the condition when there is no free buffers in the buffer pool, and there is may deadlock problem if we choose to wait in a blocking way. In my opinion, we should guarantee the upper layer of network stack don't hold more than one buffer (the min number of exclusive buffer per channel) to avoid deadlock, let us consider an extreme situation, the upper layer of network stack always request buffer but never recycle them, then it is doom to lead to the exhaustion of network buffer thus result in deadlock problem, then how does the upper layer of network know how many buffer it can request, I think restricting the number to only one buffer is reasonable, and if the upper layer wants to cache data for some reason, then it should copy the data to a new place (maybe buffer from MemoryManager or other unpooled buffer) and recycle the network buffer to network stack. > Also, could you list the dependencies that you would like to add to the > runtime for the default compression (if any)? Currently, only one dependency (org.lz4:lz4-java) is added. Do you think it acceptable if we don't make it pluggable in this version? If so, we only need to add one config option which is used to enable/disable the compression, after there is only one compress algorithm at the moment. We can make it pluggable in the future. I am asking this mainly because of the limited time. If we make it pluggable in this, we may need to decide the plugin interface (can be used by both table and runtime) which may take some time. Currently, I use the interface copied from table module directly in my POC implementation. > I was mainly thinking if you want to expose to the user some compression >tweaking parameters (affecting compression speed/ratio). I would prefer to >avoid that, at least in the first version. I totally agree with you, we should avoid to add too many config options in the first version. We may add some in the future if there are demands from users. BTW, do you have any other concerns? [~pnowojski] > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982446#comment-16982446 ] Piotr Nowojski commented on FLINK-14845: For the output, writing back the compressed bytes in place of the uncompressed ones would be a neat trick, as long as it doesn't complicate the code too much. {quote}For InputGate, currently, the previous Buffer is guaranteed to be recycled before getting next buffer, so we can decompress the compressed Buffer to the reusable intermediate buffer in decompressor{quote} I'm not sure if that's a good idea to relay on such behaviour. I think there was not explicit contract that only one single buffer can be polled from an InputGate/InputChannel at a time, or is there? Would it be more complicate to request a buffer from the network pool? {quote}support LZ4 compression algorithm and we can use that by default.{quote} +1 for LZ4 as the default. {quote}I think we should make it pluggable. The users can implement a factory for compressor and decompressor instance creation{quote} That might be a follow up story. And if we want to make it pluggable, keep in mind that it should be using plugin class loader ({{PluginUtils}}, {{PluginManager}}). Also, could you list the dependencies that you would like to add to the runtime for the default compression (if any)? We might need to pluginize the default compressor as well. {quote}At least two config options should be added, one is for compression enable/disable and another is for compression codec (factory).{quote} I was mainly thinking if you want to expose to the user some compression tweaking parameters (affecting compression speed/ratio). I would prefer to avoid that, at least in the first version. {quote}I think users of Flink can benefit from this feature, especially the batch users, so it better to make it available for 1.10 {quote} This might be tight, but we can try to do it. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982271#comment-16982271 ] Yingjie Cao commented on FLINK-14845: - > Where are you going to compress data into? Each compressor instance contains a slice of unpooled intermediate buffer and at least two compression methods will be implemented, one compresses the given input Buffer to the intermediate buffer and warp the buffer as a Buffer instance and returns it to the caller, the intermediate buffer will be reused, that is, the caller should guarantee that the returned Buffer is ready to be recycled before compress next buffer, another compression method compresses the the given input buffer to the intermediate buffer, then copies the compressed data back to the input Buffer and return it. The decompressor works similarly, that is, it can decompress the given Buffer to a reusable intermediate buffer and return or copy the decompressed data back to the input Buffer. We could implement a new util method which takes two input Buffers and compress/decompress data from one to another if needed. Compressor and decompressor instance is InputGate and ResultPartition level and shared between subpartitions, the memory overhead would be 32k/64k per InputGate or ResultPartition. Currently, the BlockingSubpartition writers Buffer out and recycle it directly so we can choose to compress the Buffer to the usable intermediate buffer which can avoid one copy. For PipelinedSubpartition, the Buffer is not recycled directly, we can copy it back to the sliced Buffer right to the original position and slice a new Buffer. (Note if the data size grows after compression, the compression will be given up and the data will be kept uncompressed) For InputGate, currently, the previous Buffer is guaranteed to be recycled before getting next buffer, so we can decompress the compressed Buffer to the reusable intermediate buffer in decompressor. > What compression algorithm would you like to use? A general purpose compression library which dependents on a Apache compatible third party library has already been implemented in table module. I think that compression library can be reused by both runtime and table. We only need to copy the compression util from table to runtime and make some minor changes accordingly. > Will the compression be configurable? Currently, the compression util in table only support LZ4 compression algorithm and we can use that by default. I think we should make it pluggable. The users can implement a factory for compressor and decompressor instance creation. At least two config options should be added, one is for compression enable/disable and another is for compression codec (factory). > What's the timeline of this feature? I think users of Flink can benefit from this feature, especially the batch users, so it better to make it available for 1.10 if time allows (not a problem for me). Besides, there is another consideration, that is, we want the user to be able to reproduce the tpc-ds benchmark results using version 1.10. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16980729#comment-16980729 ] Piotr Nowojski commented on FLINK-14845: Maybe before going further I have a couple of questions. Where are you going to compress data into? To some unpooled memory? Would you keep a reference count of the buffer that holding uncompressed bytes as long as we are handling the compressed memory? If not, compression could double the amount of buffered in-flight data. What compression algorithm would you like to use? Some library? Will we need to add a new dependency? Will its license be compatible with Apache? Will the compression be configurable? Can user select multiple compression algorithms and compression parameters? Do we want to let user provide his own compression, in other words, make the compressor pluggable? > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979871#comment-16979871 ] Yingjie Cao commented on FLINK-14845: - [~pnowojski] I totally agree with you. Then I will work on the issue and will update if there is any problem or progress. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979249#comment-16979249 ] Piotr Nowojski commented on FLINK-14845: [~kevin.cyj], I think I would prefer the 3rd option, but I think you are right, potential implementation cost of this option might not be worth it for now. I guess the difference between 2. and 4. in terms of an implementation is very small? We could start with 4. and re-evaluate 3., if we ever get such request from a user. Side note, [~AHeise] made me aware that we might find this compression useful also for spilling the buffers for unaligned checkpoints. Also just for the record. I was thinking about another solution, to compress/decompress the data in Netty handlers. For streaming that would solve all of the problems with network flushes and small records/buffers (Netty would be compressing multiplexed channels), but again that wouldn't be very helpful for the {{BoundedBlockingSubpartition}}. Again, for now we can stick with the common compression code path shared between streaming/batch and if this ever becomes an issue, we can re-visit. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979002#comment-16979002 ] Yingjie Cao commented on FLINK-14845: - [~pnowojski] , thanks for proposing these options. # According to [~lzljs3620320]'s comment, there seems much work to do if we add compression to table/sql layer. # Currently, my POC implementation adopts this option, though as you have pointed out, this could make the system less complete. # As far as I know, most of the compression algorithms act on a window of data, for example 32k or 64k. If we want to implement a continuous compression algorithm which takes in records and gives out compressed record immediately (dose not produce the compressed record immediately may influence latency and invalid the flush mechanism), then we must consider how to update the meta (the 'dict' for decompression) and let the down stream know how to decompress. As you have pointed out, that is complicated. # If we let the compression happen insideBufferConsumer#build, then for Pipelined mode, netty threads do the compression and for Blocking mode, task thread do the compression. If we don't which thread do the compression (not a problem for me), it's a good choice and can work for both Pipelined and Blocking mode. >From my perspective of view, I prefer the forth one. What do you think? >[~pnowojski] > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978343#comment-16978343 ] Jingsong Lee commented on FLINK-14845: -- > doesn't Blink work on batches of records? Couldn't the (columnar?) > compression be performed there? Either via some mapping operator or on record > serialisation/deserialisation layer? Current Blink work on BaseRow which means work on per record, In table/sql layer, we can not select the sub result partition, so these row->ColumnarBatch and compression things should do after sub partition selected. So we must provide some special batch serialization and related compression interface to support columnar batch and its compression for table/sql layer. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978325#comment-16978325 ] Piotr Nowojski commented on FLINK-14845: Re, [~lzljs3620320]: doesn't Blink work on batches of records? Couldn't the (columnar?) compression be performed there? Either via some mapping operator or on record serialisation/deserialisation layer? [~kevin.cyj], thanks for pointing this out. I missed that {{ResultSubpartitionView#getNextBuffer}} happens too late for {{BoundedBlockingSubpartition}} - after the data have already been written to disks. In that case I can see three options: # let the compression happen somewhere in {{BoundedBlockingSubpartition#writeAndCloseBufferConsumer}} # use some streaming/continuous compression algorithm, that would allow for the compressed stream of bytes to be chopped off (and decompressed) at any point of time # let the compression happen inside {{BufferConsumer#build}}? 1. Would work just for {{BoundedBlockingSubpartition}} 2. Maybe a bit more complicated, but works for both pipelined and bounded sub-partitions and also solves the problem of frequent flushing with few records handled in between? 3. ? > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978297#comment-16978297 ] Chesnay Schepler commented on FLINK-14845: -- [~kevin.cyj] Cluster partitions may also be read by another cluster; as such the codec information must be contained somewhere in the ResultPartitionDeploymentDescriptor. Alternatively, to simplify things for the time being, we could also say that cluster partitions must not be compressed. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978288#comment-16978288 ] Yingjie Cao commented on FLINK-14845: - [~chesnay] A choice maybe make the compression codec cluster wide and can not be configured per job. Another choice is storing the information in meta and take it with the deployment info when deploying a task. What do you think? > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978284#comment-16978284 ] Yingjie Cao commented on FLINK-14845: - [~pnowojski] You are right, taking both Blocking and Pipelined scenario into consideration would make the system more complete. Do you think only compressing the whole buffer using the task thread and leaving the the flushing sliced buffer uncompressed is acceptable for Pipelined mode? Currently, the blocking subpartition does not interact with netty thread and the task thread directly write Buffers out, If we choose to compress with netty thread, we have to consider the interaction between netty thread and task thread. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978268#comment-16978268 ] Chesnay Schepler commented on FLINK-14845: -- How would this work with cluster partitions? (FLIP-67; partitions being kept around after a job finishes, potentially being read by other jobs) How would the other jobs know what compression was used? > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978257#comment-16978257 ] Jingsong Lee commented on FLINK-14845: -- [~pnowojski] put forward a good thought for the future. I have tested it, columnar compression can achieve 10x compression, but lz4 can just achieve 2x compression. But it also brings more conversion overhead. And it requires the network layer to make great changes. Maybe it is an another story that it need do compression on structure records but normal compression just need base on binary data. I'm very glad to have some consideration. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978243#comment-16978243 ] Piotr Nowojski commented on FLINK-14845: +1 for the future. Could be useful also for {{PipelinedSubpartition}} in some rare scenario of network bound clusters. Compressing the data in the Task thread with the presence of an {{OutputFlusher}} would be very difficult to do, maybe impossible without adding extra synchronisation. It's because currently buffer ({{BufferConsumer}}) can be handed to netty for consumption and more records can be appended from the task thread. To deal with that, we would have to compress data only per record, or add extra synchronisation between Netty and Task threads. I think maybe a better idea would be to do the compression on the boundary of {{ResultSubpartition}} and netty (expand {{CreditBasedSequenceNumberingViewReader}}? create a new alternative {{NetworkSequenceViewReader}}?), and perform the compression in the Netty thread. I don't think this should be an issue, as this would be non blocking operation that's relatively fast (same order of magnitude as the copying the memory). To be the devil advocate here. Wouldn't Blink benefit more, from a more specific, columnar compression/decompression compared to a generic {{Buffer}}/{{MemorySegment}} based? Compressing each column independently should give with much better compression ratios. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978241#comment-16978241 ] Yingjie Cao commented on FLINK-14845: - Doing compression and decompression with task threads may be a better choice, after all, the netty threads are shared between jobs and should be dedicated to message communication. If we don't touch the Pipeline part and only introduce compression to BoundedBlockingSubpartition, can we ignore the flushing handover logic for the moment? After all, no flushing happens for blocking subpartition. By the way, if we introduce compression to Pipeline mode later, I prefer only to only compress the whole buffer (task thread can do the work) and leave the flushing sliced buffer uncompressed, after all, compressing too small data may not help at all. [~sewen] What do you think? > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978203#comment-16978203 ] Stephan Ewen commented on FLINK-14845: -- For combining compression and low latency, we need to see which thread should do the compression, how to not make it interfere with the flushing handover logic (which does minimal concurrency), and how to avoid compression for local channels. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978184#comment-16978184 ] Yingjie Cao commented on FLINK-14845: - [~lzljs3620320] For small data, for example, dozens of byte, the case should be common, for 32K Buffer, I guess that should be rare. I add the flag to be on the safe side. Besides, I have also considered about whether we can reuse the compression code for pipeline mode, the low-latency flushing mechanism may lead to send of buffer much smaller than 32K, which may make the case in 8 more common. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978175#comment-16978175 ] Jingsong Lee commented on FLINK-14845: -- Thanks [~kevin.cyj] A little confuse about point 8, Are there many cases where the compression size is larger than the original data? If not, Can we consider removing the compression flag from the buffer? My first thought is that this should be a global flag instead of per buffer. (But per buffer maybe not easy to make mistakes) > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978153#comment-16978153 ] Yingjie Cao commented on FLINK-14845: - To verify how many improvements we could gain from compression. I give a POC implementation which dose not make much abstraction and modifies the existing InputGate and ResultPartition directly (the final result shows that the change is modest). The main changes are summarized as follows. # Copied the block compression utils from flink-table (should be shared by table and runtime). # Implement a BufferCompressor and a BufferDecompressor util which take in Buffer and return Buffer based on the compression until from 1. # Add a flag to Buffer to identify if it is compressed. # Each ResultPartition contains a BufferCompressor and each SingleInputGate contains a BufferDecompressor. # In BoundedBlockingSubpartition, when writing a buffer out, the buffer will be compressed (call BufferCompressor#compress) if the compression is enabled. When written to disk, the compression flag is also written out. # In SingleInputGate if a received Buffer is compressed (identified by flag introduced by 3), then it will be decompressed (call BufferDecompressor#decompress). # Add a compression flag to BufferResponse to let the Buffer receiver known if the Buffer is compressed (This additional flag can be avoid by adding a new message type like CompressedBufferResponse which directly extends BufferResponse and implements a isCompressed interface which always returns true). # There is a bad case in which the data size become even bigger after compression, the BufferCompressor will handle that and the original uncompressed buffer will be returned, which means that the receiver will receive a mixture of compressed and uncompressed Buffer. [~sewen] [~ykt836] [~lzljs3620320] What do you think? > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978019#comment-16978019 ] Jingsong Lee commented on FLINK-14845: -- [~sewen] Sorry for misunderstanding. What I want to say is forward instead of pipeline. As [~ykt836] said, now, we set true to `StreamGraph.setBlockingConnectionsBetweenChains`, that will set all result partition type to `BLOCKING`. We also set false to `StreamGraph.setAllVerticesInSameSlotSharingGroupByDefault`. Because operators have separate memory usage. But your reminder may be a way. We can let source and its forward join to run co-located in the same slot. In this special case, we can set pipeline shuffle type and set them into one slot share group. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977982#comment-16977982 ] Kurt Young commented on FLINK-14845: [~sewen] Currently we set all shuffle type to `BLOCKING` in blink planner to avoid schedule deadlock. And even if we set some of the shuffle type to `PIPELINE`, e.g. the forward side of broadcast hash join, we still need to set the colocation group for this operator to make the input and hash join scheduled into one slot. We didn't try colocation group before and have no idea how it performs. Might be interesting to give a shot in a near future. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977693#comment-16977693 ] Stephan Ewen commented on FLINK-14845: -- [~lzljs3620320] Quick question for clarification: In the example you described, the large table could (should) be connected to the join task by a pipelined channel that does not spill (in memory, connecting source and join who run co-located in the same slot). I guess that should also be possible for the Blink query engine in 1.10 I guess, with FLIP-53 ? > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977441#comment-16977441 ] Stephan Ewen commented on FLINK-14845: -- Kurt and me had a quick chat about this. On the receiver side, having a "decompressing input gate" seems pretty straightforward. On the sender side, it is slightly more tricky. We have to check how this interacts with the "task to netty handover" for low-latency sender-side flushing, though. That code is there even if no low latency flushing happens. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14845) Introduce data compression to blocking shuffle.
[ https://issues.apache.org/jira/browse/FLINK-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977096#comment-16977096 ] Jingsong Lee commented on FLINK-14845: -- Big +1 for this feature. There will be a lot of shuffles in the current batch scene, that is, there will be a lot of spilling IOs. For example, in the simplest join scenario, a large table and a small table join can filter out a lot of data. The data scale of a small table is very small. We can use broadcast join, but because there is no chain for TwoInputOperator at present, the large table still needs to go through the pipeline to cause spilling data to disk. Now that there is no TwoInputOperatorChain, shuffle is the performance killer of the whole job. Disk IO is the performance bottleneck in many scenario. > Introduce data compression to blocking shuffle. > --- > > Key: FLINK-14845 > URL: https://issues.apache.org/jira/browse/FLINK-14845 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Reporter: Yingjie Cao >Priority: Major > > Currently, blocking shuffle writer writes raw output data to disk without > compression. For IO bounded scenario, this can be optimized by compressing > the output data. It is better to introduce a compression mechanism and offer > users a config option to let the user decide whether to compress the shuffle > data. Actually, we hava implemented compression in our inner Flink version > and here are some key points: > 1. Where to compress/decompress? > Compressing at upstream and decompressing at downstream. > 2. Which thread do compress/decompress? > Task threads do compress/decompress. > 3. Data compression granularity. > Per buffer. > 4. How to handle that when data size become even bigger after compression? > Give up compression in this case and introduce an extra flag to identify if > the data was compressed, that is, the output may be a mixture of compressed > and uncompressed data. > > We'd like to introduce blocking shuffle data compression to Flink if there > are interests. > -- This message was sent by Atlassian Jira (v8.3.4#803005)