Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi all, Thanks for all the feedback and suggestions so far. The discussion has been going on for some time. If there are no further comments, we will start voting today. Best, Yuxin Yuxin Tan 于2023年3月17日周五 12:52写道: > Hi, > Thanks for joining the discussion and giving the ideas. > > @ron > > can the Hybrid Shuffle replace the RSS in the future? > > The hybrid shuffle and RSS offer distinct solutions to address > the shuffle operation challenge. To optimize performance, we > store shuffle data in different tiers of memory and disk, enabling > greater flexibility and ease of use. Specifically, we cache > intermediate data in memory to reduce disk I/O overhead. > In contrast, RSS is a standalone service that can operate across > multiple servers within a cluster, parallelizing shuffle operations > to enhance performance. However, this introduces additional > deployment and maintenance costs. Each approach has its own > benefits and drawbacks, and users should be able to select the > method that best suits their needs. So I think we cannot replace > RSS in the future. > > @ConradJam > > Should we define a data acceleration layer like Alluxio in remote > storage? > > I'm not entirely clear on the detailed plan you've proposed, but I > understand that you want to use Alluxio to serve as a cache layer for > the remote stoarge tier. It's designed to provide low-latency data > access to applications through a distributed caching layer. However, > implementing Alluxio introduces additional dependencies and > deployment/maintenance costs for users. While our design approach > is to supplement local storage with remote storage, as local storage > is generally sufficient. Given the limited usage scenarios, introducing > such costs for optimization may not be worthwhile or meaningful. > Additionally, for users, added dependencies imply increased complexity. > > Best, > Yuxin > > > ConradJam 于2023年3月17日周五 11:11写道: > >> Thanks for your start this discuss >> >> >> Here I am a bit confused about the memory layer definition. This refers to >> local memory. Should we define a data acceleration layer like Alluxio [1] >> in remote storage? >> >> >> Let me cite a scenario: If I use Fluid [2] to mount an AlluxioRuntime [3] >> on K8S, it looks like a local disk (but it is actually a remote memory >> storage), Have we specified this behavior or optimized it for this >> scenario? >> >> >> [1] What is alluxio : >> https://docs.alluxio.io/os/user/stable/en/Overview.html >> >> [2] Fluid: https://fluid-cloudnative.github.io/ >> >> [3] Fluid Alluxio Runtime: >> >> https://fluid-cloudnative.github.io/samples/tieredstore_config.html#prerequisites >> >> liu ron 于2023年3月17日周五 10:39写道: >> >> > Hi, Yuxin, >> > >> > Thanks for creating this FLIP. Adding remote storage capability to >> Flink's >> > Hybrid Shuffle is a significant improvement that addresses the issue of >> > local disk storage limitations, this also can improve the stability of >> > Flink Batch Job. >> > I just have one question: can the Hybrid Shuffle replace the RSS in the >> > future? Due to the Hybrid Shuffle having remote storage ability, I think >> > maybe we don't need to maintain a standalone RSS, it will simplify our >> > operation work. >> > >> >> >> -- >> Best >> >> ConradJam >> >
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi, Thanks for joining the discussion and giving the ideas. @ron > can the Hybrid Shuffle replace the RSS in the future? The hybrid shuffle and RSS offer distinct solutions to address the shuffle operation challenge. To optimize performance, we store shuffle data in different tiers of memory and disk, enabling greater flexibility and ease of use. Specifically, we cache intermediate data in memory to reduce disk I/O overhead. In contrast, RSS is a standalone service that can operate across multiple servers within a cluster, parallelizing shuffle operations to enhance performance. However, this introduces additional deployment and maintenance costs. Each approach has its own benefits and drawbacks, and users should be able to select the method that best suits their needs. So I think we cannot replace RSS in the future. @ConradJam > Should we define a data acceleration layer like Alluxio in remote storage? I'm not entirely clear on the detailed plan you've proposed, but I understand that you want to use Alluxio to serve as a cache layer for the remote stoarge tier. It's designed to provide low-latency data access to applications through a distributed caching layer. However, implementing Alluxio introduces additional dependencies and deployment/maintenance costs for users. While our design approach is to supplement local storage with remote storage, as local storage is generally sufficient. Given the limited usage scenarios, introducing such costs for optimization may not be worthwhile or meaningful. Additionally, for users, added dependencies imply increased complexity. Best, Yuxin ConradJam 于2023年3月17日周五 11:11写道: > Thanks for your start this discuss > > > Here I am a bit confused about the memory layer definition. This refers to > local memory. Should we define a data acceleration layer like Alluxio [1] > in remote storage? > > > Let me cite a scenario: If I use Fluid [2] to mount an AlluxioRuntime [3] > on K8S, it looks like a local disk (but it is actually a remote memory > storage), Have we specified this behavior or optimized it for this > scenario? > > > [1] What is alluxio : > https://docs.alluxio.io/os/user/stable/en/Overview.html > > [2] Fluid: https://fluid-cloudnative.github.io/ > > [3] Fluid Alluxio Runtime: > > https://fluid-cloudnative.github.io/samples/tieredstore_config.html#prerequisites > > liu ron 于2023年3月17日周五 10:39写道: > > > Hi, Yuxin, > > > > Thanks for creating this FLIP. Adding remote storage capability to > Flink's > > Hybrid Shuffle is a significant improvement that addresses the issue of > > local disk storage limitations, this also can improve the stability of > > Flink Batch Job. > > I just have one question: can the Hybrid Shuffle replace the RSS in the > > future? Due to the Hybrid Shuffle having remote storage ability, I think > > maybe we don't need to maintain a standalone RSS, it will simplify our > > operation work. > > > > > -- > Best > > ConradJam >
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Thanks for your start this discuss Here I am a bit confused about the memory layer definition. This refers to local memory. Should we define a data acceleration layer like Alluxio [1] in remote storage? Let me cite a scenario: If I use Fluid [2] to mount an AlluxioRuntime [3] on K8S, it looks like a local disk (but it is actually a remote memory storage), Have we specified this behavior or optimized it for this scenario? [1] What is alluxio : https://docs.alluxio.io/os/user/stable/en/Overview.html [2] Fluid: https://fluid-cloudnative.github.io/ [3] Fluid Alluxio Runtime: https://fluid-cloudnative.github.io/samples/tieredstore_config.html#prerequisites liu ron 于2023年3月17日周五 10:39写道: > Hi, Yuxin, > > Thanks for creating this FLIP. Adding remote storage capability to Flink's > Hybrid Shuffle is a significant improvement that addresses the issue of > local disk storage limitations, this also can improve the stability of > Flink Batch Job. > I just have one question: can the Hybrid Shuffle replace the RSS in the > future? Due to the Hybrid Shuffle having remote storage ability, I think > maybe we don't need to maintain a standalone RSS, it will simplify our > operation work. > -- Best ConradJam
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi, Yuxin, Thanks for creating this FLIP. Adding remote storage capability to Flink's Hybrid Shuffle is a significant improvement that addresses the issue of local disk storage limitations, this also can improve the stability of Flink Batch Job. I just have one question: can the Hybrid Shuffle replace the RSS in the future? Due to the Hybrid Shuffle having remote storage ability, I think maybe we don't need to maintain a standalone RSS, it will simplify our operation work.
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi, Yun Thanks for your suggestions. > I think you could describe it explicitly in the original FLIP's goals or design principles. I have updated the FLIP and given a more detailed description of the tier decoupling design. Best, Yuxin Yun Tang 于2023年3月15日周三 20:46写道: > Hi Yuxin, > > Thanks for your explanations. > I think the kernel idea is that you prefer the simple and decoupling > design for the 1st version of hybrid shuffle with remote storage. If > following this idea, perhaps I could accept your current explanations and I > think you could describe it explicitly in the original FLIP's goals or > design principles. > > > Best > Yun Tang > > From: Yuxin Tan > Sent: Wednesday, March 15, 2023 12:41 > To: dev@flink.apache.org > Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage > > Hi, Yun, > > Thanks for sharing the ideas. > > > 1. We should trigger to kick the shuffle data to remote storage > once either condition is reached > > I believe that configuring two options in this manner is a pragmatic > approach that can fulfill a wider range of usage scenarios. However, > if we present two options, it may become difficult to remove them > in the future once users have started relying on them. On the other > hand, if we introduce a single option, we can easily incorporate > additional options based on your recommendations if required. > Thus, we recommend adopting a one-option solution in the first > version to address the issue. > > > 2. Perhaps we could switch to kick shuffle data to remote storage > once no space left exception is met. > > Thanks for your valuable feedback. While the suggestion is a viable > solution to address the no space left exception issue, we are > concerned that implementing it could create interdependence between > the disk tier and remote storage tier, which would contradict our goal > of achieving independence between tiers in the new architecture. > Moreover, we believe that it is better to prevent encountering the > exception in the first place by reserving adequate disk space. This is > because other processes on the same machine may also be impacted > when the exception occurs. If the exception does still arise, we can > explore other potential solutions through detailed design and > discussions, such as the one you proposed, optimizing the reserved > space with a global counter of TM, etc. Although the current implementation > only partially addresses the exception issue, we expect to improve it > in subsequent versions due to the complexity of this FLIP. We would > appreciate hearing your thoughts on this matter. > > Best, > Yuxin > > > Yun Tang 于2023年3月14日周二 14:48写道: > > > Hi Yuxin > > > > Thanks for your reply. > > I am not saying that we should use an absolute reserved value to replace > > the current plan of the reserved fraction. We should trigger to kick the > > shuffle data to remote storage once either condition is reached. > > Maybe we could get some idea from the configuration of tiered based state > > backend [1]. > > > > The concern about concurrent writing is due to the local disk being > shared > > by all the instances running on that node, we cannot ensure other > > components would not flush data during shuffle writing. Perhaps we could > > switch to kick shuffle data to remote storage once no space left > exception > > is met. > > > > > > [1] > > > https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo > > > > > > Best > > Yun Tang > > > > > > From: Yuxin Tan > > Sent: Monday, March 13, 2023 15:06 > > To: dev@flink.apache.org > > Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage > > > > Hi, > > Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang. > > > > @Zhu Zhu > > > 1. I'm a bit concerned about the cost of it. > > > > The Read Client's request to check the existence of a segment in each > > storage tier is the netty message, which is similar to the credit > updating > > messages. It is observed that the netty message cost for these operations > > is relatively low and the number of messages is also relatively low > > compared to credit updates which are sent every few buffers. Moreover, > > since this request involves only memory operations, the message cost > > is significantly low during the total reading data process. > > > > And we will further optimize the message
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi Yuxin, Thanks for your explanations. I think the kernel idea is that you prefer the simple and decoupling design for the 1st version of hybrid shuffle with remote storage. If following this idea, perhaps I could accept your current explanations and I think you could describe it explicitly in the original FLIP's goals or design principles. Best Yun Tang From: Yuxin Tan Sent: Wednesday, March 15, 2023 12:41 To: dev@flink.apache.org Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage Hi, Yun, Thanks for sharing the ideas. > 1. We should trigger to kick the shuffle data to remote storage once either condition is reached I believe that configuring two options in this manner is a pragmatic approach that can fulfill a wider range of usage scenarios. However, if we present two options, it may become difficult to remove them in the future once users have started relying on them. On the other hand, if we introduce a single option, we can easily incorporate additional options based on your recommendations if required. Thus, we recommend adopting a one-option solution in the first version to address the issue. > 2. Perhaps we could switch to kick shuffle data to remote storage once no space left exception is met. Thanks for your valuable feedback. While the suggestion is a viable solution to address the no space left exception issue, we are concerned that implementing it could create interdependence between the disk tier and remote storage tier, which would contradict our goal of achieving independence between tiers in the new architecture. Moreover, we believe that it is better to prevent encountering the exception in the first place by reserving adequate disk space. This is because other processes on the same machine may also be impacted when the exception occurs. If the exception does still arise, we can explore other potential solutions through detailed design and discussions, such as the one you proposed, optimizing the reserved space with a global counter of TM, etc. Although the current implementation only partially addresses the exception issue, we expect to improve it in subsequent versions due to the complexity of this FLIP. We would appreciate hearing your thoughts on this matter. Best, Yuxin Yun Tang 于2023年3月14日周二 14:48写道: > Hi Yuxin > > Thanks for your reply. > I am not saying that we should use an absolute reserved value to replace > the current plan of the reserved fraction. We should trigger to kick the > shuffle data to remote storage once either condition is reached. > Maybe we could get some idea from the configuration of tiered based state > backend [1]. > > The concern about concurrent writing is due to the local disk being shared > by all the instances running on that node, we cannot ensure other > components would not flush data during shuffle writing. Perhaps we could > switch to kick shuffle data to remote storage once no space left exception > is met. > > > [1] > https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo > > > Best > Yun Tang > > > From: Yuxin Tan > Sent: Monday, March 13, 2023 15:06 > To: dev@flink.apache.org > Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage > > Hi, > Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang. > > @Zhu Zhu > > 1. I'm a bit concerned about the cost of it. > > The Read Client's request to check the existence of a segment in each > storage tier is the netty message, which is similar to the credit updating > messages. It is observed that the netty message cost for these operations > is relatively low and the number of messages is also relatively low > compared to credit updates which are sent every few buffers. Moreover, > since this request involves only memory operations, the message cost > is significantly low during the total reading data process. > > And we will further optimize the message cost later, particularly for > segments that remain in the same tier without switching tiers. That > is, for consecutive segments in the same tier, we can continue to send > the next segment without waiting for the downstream to ask whether the > segment exists in this tier. > > > @Xia Sun > > 1. how to keep the order of segments between different storage tiers > > To indicate the sequential order of upstream segments, we rely on segmentId > downstream. Once segment n has been fully consumed by the downstream, > the subsequent segment n + 1 will be asked in its natural order. As for the > order of the buffers within each segment, they follow the default ordering > mechanisms of Flink. > > > 2. what will happen if the disk space is found to
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi, Yun, Thanks for sharing the ideas. > 1. We should trigger to kick the shuffle data to remote storage once either condition is reached I believe that configuring two options in this manner is a pragmatic approach that can fulfill a wider range of usage scenarios. However, if we present two options, it may become difficult to remove them in the future once users have started relying on them. On the other hand, if we introduce a single option, we can easily incorporate additional options based on your recommendations if required. Thus, we recommend adopting a one-option solution in the first version to address the issue. > 2. Perhaps we could switch to kick shuffle data to remote storage once no space left exception is met. Thanks for your valuable feedback. While the suggestion is a viable solution to address the no space left exception issue, we are concerned that implementing it could create interdependence between the disk tier and remote storage tier, which would contradict our goal of achieving independence between tiers in the new architecture. Moreover, we believe that it is better to prevent encountering the exception in the first place by reserving adequate disk space. This is because other processes on the same machine may also be impacted when the exception occurs. If the exception does still arise, we can explore other potential solutions through detailed design and discussions, such as the one you proposed, optimizing the reserved space with a global counter of TM, etc. Although the current implementation only partially addresses the exception issue, we expect to improve it in subsequent versions due to the complexity of this FLIP. We would appreciate hearing your thoughts on this matter. Best, Yuxin Yun Tang 于2023年3月14日周二 14:48写道: > Hi Yuxin > > Thanks for your reply. > I am not saying that we should use an absolute reserved value to replace > the current plan of the reserved fraction. We should trigger to kick the > shuffle data to remote storage once either condition is reached. > Maybe we could get some idea from the configuration of tiered based state > backend [1]. > > The concern about concurrent writing is due to the local disk being shared > by all the instances running on that node, we cannot ensure other > components would not flush data during shuffle writing. Perhaps we could > switch to kick shuffle data to remote storage once no space left exception > is met. > > > [1] > https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo > > > Best > Yun Tang > > > From: Yuxin Tan > Sent: Monday, March 13, 2023 15:06 > To: dev@flink.apache.org > Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage > > Hi, > Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang. > > @Zhu Zhu > > 1. I'm a bit concerned about the cost of it. > > The Read Client's request to check the existence of a segment in each > storage tier is the netty message, which is similar to the credit updating > messages. It is observed that the netty message cost for these operations > is relatively low and the number of messages is also relatively low > compared to credit updates which are sent every few buffers. Moreover, > since this request involves only memory operations, the message cost > is significantly low during the total reading data process. > > And we will further optimize the message cost later, particularly for > segments that remain in the same tier without switching tiers. That > is, for consecutive segments in the same tier, we can continue to send > the next segment without waiting for the downstream to ask whether the > segment exists in this tier. > > > @Xia Sun > > 1. how to keep the order of segments between different storage tiers > > To indicate the sequential order of upstream segments, we rely on segmentId > downstream. Once segment n has been fully consumed by the downstream, > the subsequent segment n + 1 will be asked in its natural order. As for the > order of the buffers within each segment, they follow the default ordering > mechanisms of Flink. > > > 2. what will happen if the disk space is found to be full? > > By introducing a way of reserving some space in advance, we will try to > avoid this situation of reporting the disk space error as much as possible. > The size of reserved space can be configured through the option introduced > in the public API. > > > 3. For remote storage, does the switching between different tiers > will bring overhead or waiting? > > Our primary focus for the first version is to address the problem of job > stability > and implement a new architecture because the change of t
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi Yuxin Thanks for your reply. I am not saying that we should use an absolute reserved value to replace the current plan of the reserved fraction. We should trigger to kick the shuffle data to remote storage once either condition is reached. Maybe we could get some idea from the configuration of tiered based state backend [1]. The concern about concurrent writing is due to the local disk being shared by all the instances running on that node, we cannot ensure other components would not flush data during shuffle writing. Perhaps we could switch to kick shuffle data to remote storage once no space left exception is met. [1] https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo Best Yun Tang From: Yuxin Tan Sent: Monday, March 13, 2023 15:06 To: dev@flink.apache.org Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage Hi, Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang. @Zhu Zhu > 1. I'm a bit concerned about the cost of it. The Read Client's request to check the existence of a segment in each storage tier is the netty message, which is similar to the credit updating messages. It is observed that the netty message cost for these operations is relatively low and the number of messages is also relatively low compared to credit updates which are sent every few buffers. Moreover, since this request involves only memory operations, the message cost is significantly low during the total reading data process. And we will further optimize the message cost later, particularly for segments that remain in the same tier without switching tiers. That is, for consecutive segments in the same tier, we can continue to send the next segment without waiting for the downstream to ask whether the segment exists in this tier. @Xia Sun > 1. how to keep the order of segments between different storage tiers To indicate the sequential order of upstream segments, we rely on segmentId downstream. Once segment n has been fully consumed by the downstream, the subsequent segment n + 1 will be asked in its natural order. As for the order of the buffers within each segment, they follow the default ordering mechanisms of Flink. > 2. what will happen if the disk space is found to be full? By introducing a way of reserving some space in advance, we will try to avoid this situation of reporting the disk space error as much as possible. The size of reserved space can be configured through the option introduced in the public API. > 3. For remote storage, does the switching between different tiers will bring overhead or waiting? Our primary focus for the first version is to address the problem of job stability and implement a new architecture because the change of this FLIP has been relatively complicated. However, remote storage may lead to additional overhead for reading small files. Therefore, we recommend using remote storage as a supplement to local disks, where the latter would serve as the primary storage in most cases, resulting in less affection for performance. After introducing the new architecture, we will also continue optimizing the remote storage performance. At that time, this change will be relatively cohesive, focusing only on the remote storage tier, without affecting other modules. @Yun Tang > 1. Normal shuffle which writes data to disk could also benefit from this. I agree with the idea. And our core implementation design is independent of shuffle mode, and is not strongly tied to hybrid shuffle. Actually, we are only applying this architecture to hybrid shuffle. If needed, it can be adapted by slight changes for other shuffle modes in the future, making it easier for them to benefit from this feature. However, for the first version, we are focusing on hybrid shuffle because of the affection scope of the change and the potential of hybrid shuffle. > 2. will it affect current design of pluggable remote shuffle service? It will not affect the current design of the pluggable shuffle service. Our design focuses on improving the Flink internal shuffle. > 3. the condition of min-reserve-space-fraction to kick local data to remote storage might not be a good idea in all cases In fact, we believe that using fractions as opposed to fixed values has more general use cases, especially considering any default values cannot suffice for all scenarios. For instance, default values such as 5GB are too small for a 3TB SATA disk, and are too large for a 20GB local disk (where 5GB accounts for 25% of the available disk space). Therefore, after conducting some internal discussions, we decided to opt for using fractions as the reserved space value. Is there any way we can reference it in the state-backend implementation? the fixed value or other solutions. > 4. will we meet a concurrency problem when different subtasks within one pro
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
tension on hybrid shuffle which > introduces a lots of changes, will it affect current design of pluggable > remote shuffle service, such as Apache Celeborn [1]. > > Thirdly, based on my previous experiences on implementing a tiered based > state-backend, the condition of min-reserve-space-fraction to kick local > data to remote storage might not be a good idea in all cases, we still need > to consider the absolute reserved disk storage. Take a 20GB local data disk > as example, it might be a bit too late to kick the local data when only 1GB > (20GB*5%) space left. > > Last but not least, will we meet a concurrency problem when different > subtasks within one process/node start to check the left disk space before > deciding to write to local or remote? > > > [1] https://celeborn.apache.org/ > > Best > Yun Tang > > ________ > From: Xia Sun > Sent: Sunday, March 12, 2023 17:16 > To: dev@flink.apache.org > Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage > > Hi Yuxin, > > Thanks for creating this FLIP! > I'm a flink user, and in our internal scenario we use the colocation > technology to run flink jobs and online service on the same machine > together. We found that flink jobs are occasionally affected by other > non-flink jobs (i.e. if the host disk space is full, that will result in > 'No space left on device' error on flink jobs). This flip will really help > us to benefit from hybrid shuffle without being worried about insufficient > disk space problem. > > And I also have a few questions. > 1. If the same subpartition spans multiple different tiers, how to keep the > order of segments between different storage tiers (if necessary)? > 2. In the process of writing to the local disk for a subpartition, what > will happen if the disk space is found to be full? Will it report an error > or automatically transfer to remote storage? > 3. For remote storage, I noticed that it uses direct reading, which is > different from the other two, does the switching between different tiers > will bring overhead or waiting? In addition, compared to flink rss, which > optimizes data compression and small file merging to improve throughput and > relieve file system pressure, does the object storage system can meet the > performance requirements and concurrent access challenges of large-scale > batch jobs(parallelism > 1)? > > Thanks, > Xia > > Zhu Zhu 于2023年3月10日周五 16:44写道: > > > Hi Yuxin, > > > > Thanks for creating this FLIP! > > The idea of tiered storage looks good. Instead of choosing one from > > multiple storages, it can help to balance between performance, cost and > > stability. It also has the potential to adaptively select proper tiers > > according to more runtime information, to achieve better performance > > and ease of use. > > > > I have a question about the tier finding of data reading. In the FLIP > > it proposes that the Read Client asks each storage tier whether a > > given segment exists in it, from higher priority tiers to lower priority > > ones. I'm a bit concerned about the cost of it, especially when data > > are written to low priority tiers. Do you have any evaluation of it? > > Is it possible to let the Reader Client know the location of the next > > segment when it has finished reading one segment? Or maybe just let it > > know whether the next segment is located in the same tier, if we can > > have the assumption that tier changing would not be very frequent. > > > > Thanks, > > Zhu > > > > Weihua Hu 于2023年3月10日周五 11:52写道: > > > > > > Thanks Yuxin for your explanation. > > > > > > That sounds reasonable. Looking forward to the new shuffle. > > > > > > > > > Best, > > > Weihua > > > > > > > > > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan > > wrote: > > > > > > > Hi, Weihua, > > > > Thanks for the questions and the ideas. > > > > > > > > > 1. How many performance regressions would there be if we only > > > > used remote storage? > > > > > > > > The new architecture can support to use remote storage only, but this > > > > FLIP target is to improve job stability. And the change in the FLIP > has > > > > been significantly complex and the goal of the first version is to > > update > > > > Hybrid Shuffle to the new architecture and support remote storage as > > > > a supplement. The performance of this version is not the first > > priority, > > > > so we haven’t tested the
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi Yuxin, Thanks for this proposal. From my understanding, this FLIP looks like a tiered based shuffle extension, which seems no need to bind with hybrid shuffle. Normal shuffle which writes data to disk could also benefit from this. Secondly, since this targets to be an extension on hybrid shuffle which introduces a lots of changes, will it affect current design of pluggable remote shuffle service, such as Apache Celeborn [1]. Thirdly, based on my previous experiences on implementing a tiered based state-backend, the condition of min-reserve-space-fraction to kick local data to remote storage might not be a good idea in all cases, we still need to consider the absolute reserved disk storage. Take a 20GB local data disk as example, it might be a bit too late to kick the local data when only 1GB (20GB*5%) space left. Last but not least, will we meet a concurrency problem when different subtasks within one process/node start to check the left disk space before deciding to write to local or remote? [1] https://celeborn.apache.org/ Best Yun Tang From: Xia Sun Sent: Sunday, March 12, 2023 17:16 To: dev@flink.apache.org Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage Hi Yuxin, Thanks for creating this FLIP! I'm a flink user, and in our internal scenario we use the colocation technology to run flink jobs and online service on the same machine together. We found that flink jobs are occasionally affected by other non-flink jobs (i.e. if the host disk space is full, that will result in 'No space left on device' error on flink jobs). This flip will really help us to benefit from hybrid shuffle without being worried about insufficient disk space problem. And I also have a few questions. 1. If the same subpartition spans multiple different tiers, how to keep the order of segments between different storage tiers (if necessary)? 2. In the process of writing to the local disk for a subpartition, what will happen if the disk space is found to be full? Will it report an error or automatically transfer to remote storage? 3. For remote storage, I noticed that it uses direct reading, which is different from the other two, does the switching between different tiers will bring overhead or waiting? In addition, compared to flink rss, which optimizes data compression and small file merging to improve throughput and relieve file system pressure, does the object storage system can meet the performance requirements and concurrent access challenges of large-scale batch jobs(parallelism > 1)? Thanks, Xia Zhu Zhu 于2023年3月10日周五 16:44写道: > Hi Yuxin, > > Thanks for creating this FLIP! > The idea of tiered storage looks good. Instead of choosing one from > multiple storages, it can help to balance between performance, cost and > stability. It also has the potential to adaptively select proper tiers > according to more runtime information, to achieve better performance > and ease of use. > > I have a question about the tier finding of data reading. In the FLIP > it proposes that the Read Client asks each storage tier whether a > given segment exists in it, from higher priority tiers to lower priority > ones. I'm a bit concerned about the cost of it, especially when data > are written to low priority tiers. Do you have any evaluation of it? > Is it possible to let the Reader Client know the location of the next > segment when it has finished reading one segment? Or maybe just let it > know whether the next segment is located in the same tier, if we can > have the assumption that tier changing would not be very frequent. > > Thanks, > Zhu > > Weihua Hu 于2023年3月10日周五 11:52写道: > > > > Thanks Yuxin for your explanation. > > > > That sounds reasonable. Looking forward to the new shuffle. > > > > > > Best, > > Weihua > > > > > > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan > wrote: > > > > > Hi, Weihua, > > > Thanks for the questions and the ideas. > > > > > > > 1. How many performance regressions would there be if we only > > > used remote storage? > > > > > > The new architecture can support to use remote storage only, but this > > > FLIP target is to improve job stability. And the change in the FLIP has > > > been significantly complex and the goal of the first version is to > update > > > Hybrid Shuffle to the new architecture and support remote storage as > > > a supplement. The performance of this version is not the first > priority, > > > so we haven’t tested the performance of using only remote storage. > > > If there are indeed regressions, we will keep optimizing the > performance > > > of the remote storages and improve it until only remote storage is > > > av
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi Yuxin, Thanks for creating this FLIP! I'm a flink user, and in our internal scenario we use the colocation technology to run flink jobs and online service on the same machine together. We found that flink jobs are occasionally affected by other non-flink jobs (i.e. if the host disk space is full, that will result in 'No space left on device' error on flink jobs). This flip will really help us to benefit from hybrid shuffle without being worried about insufficient disk space problem. And I also have a few questions. 1. If the same subpartition spans multiple different tiers, how to keep the order of segments between different storage tiers (if necessary)? 2. In the process of writing to the local disk for a subpartition, what will happen if the disk space is found to be full? Will it report an error or automatically transfer to remote storage? 3. For remote storage, I noticed that it uses direct reading, which is different from the other two, does the switching between different tiers will bring overhead or waiting? In addition, compared to flink rss, which optimizes data compression and small file merging to improve throughput and relieve file system pressure, does the object storage system can meet the performance requirements and concurrent access challenges of large-scale batch jobs(parallelism > 1)? Thanks, Xia Zhu Zhu 于2023年3月10日周五 16:44写道: > Hi Yuxin, > > Thanks for creating this FLIP! > The idea of tiered storage looks good. Instead of choosing one from > multiple storages, it can help to balance between performance, cost and > stability. It also has the potential to adaptively select proper tiers > according to more runtime information, to achieve better performance > and ease of use. > > I have a question about the tier finding of data reading. In the FLIP > it proposes that the Read Client asks each storage tier whether a > given segment exists in it, from higher priority tiers to lower priority > ones. I'm a bit concerned about the cost of it, especially when data > are written to low priority tiers. Do you have any evaluation of it? > Is it possible to let the Reader Client know the location of the next > segment when it has finished reading one segment? Or maybe just let it > know whether the next segment is located in the same tier, if we can > have the assumption that tier changing would not be very frequent. > > Thanks, > Zhu > > Weihua Hu 于2023年3月10日周五 11:52写道: > > > > Thanks Yuxin for your explanation. > > > > That sounds reasonable. Looking forward to the new shuffle. > > > > > > Best, > > Weihua > > > > > > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan > wrote: > > > > > Hi, Weihua, > > > Thanks for the questions and the ideas. > > > > > > > 1. How many performance regressions would there be if we only > > > used remote storage? > > > > > > The new architecture can support to use remote storage only, but this > > > FLIP target is to improve job stability. And the change in the FLIP has > > > been significantly complex and the goal of the first version is to > update > > > Hybrid Shuffle to the new architecture and support remote storage as > > > a supplement. The performance of this version is not the first > priority, > > > so we haven’t tested the performance of using only remote storage. > > > If there are indeed regressions, we will keep optimizing the > performance > > > of the remote storages and improve it until only remote storage is > > > available in the production environment. > > > > > > > 2. Shall we move the local data to remote storage if the producer is > > > finished for a long time? > > > > > > I agree that it is a good idea, which can release task manager > resources > > > more timely. But moving data from TM local disk to remote storage needs > > > more detailed discussion and design, and it is easier to implement it > based > > > on the new architecture. Considering the complexity, the target focus, > and > > > the iteration cycle of the FLIP, we decide that the details are not > > > included > > > in the first version. We will extend and implement them in the > subsequent > > > versions. > > > > > > Best, > > > Yuxin > > > > > > > > > Weihua Hu 于2023年3月9日周四 11:22写道: > > > > > > > Hi, Yuxin > > > > > > > > Thanks for driving this FLIP. > > > > > > > > The remote storage shuffle could improve the stability of Batch jobs. > > > > > > > > In our internal scenario, we use a hybrid cluster to run both > > > > Streaming(high priority) > > > > and Batch jobs(low priority). When there is not enough > resources(such as > > > > cpu usage > > > > reaches a threshold), the batch containers will be evicted. So this > will > > > > cause some re-run > > > > of batch tasks. > > > > > > > > It would be a great help if the remote storage could address this. > So I > > > > have a few questions. > > > > > > > > 1. How many performance regressions would there be if we only used > remote > > > > storage? > > > > > > > > 2. In current design, the shuffle data segment will
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi Yuxin, Thanks for creating this FLIP! The idea of tiered storage looks good. Instead of choosing one from multiple storages, it can help to balance between performance, cost and stability. It also has the potential to adaptively select proper tiers according to more runtime information, to achieve better performance and ease of use. I have a question about the tier finding of data reading. In the FLIP it proposes that the Read Client asks each storage tier whether a given segment exists in it, from higher priority tiers to lower priority ones. I'm a bit concerned about the cost of it, especially when data are written to low priority tiers. Do you have any evaluation of it? Is it possible to let the Reader Client know the location of the next segment when it has finished reading one segment? Or maybe just let it know whether the next segment is located in the same tier, if we can have the assumption that tier changing would not be very frequent. Thanks, Zhu Weihua Hu 于2023年3月10日周五 11:52写道: > > Thanks Yuxin for your explanation. > > That sounds reasonable. Looking forward to the new shuffle. > > > Best, > Weihua > > > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan wrote: > > > Hi, Weihua, > > Thanks for the questions and the ideas. > > > > > 1. How many performance regressions would there be if we only > > used remote storage? > > > > The new architecture can support to use remote storage only, but this > > FLIP target is to improve job stability. And the change in the FLIP has > > been significantly complex and the goal of the first version is to update > > Hybrid Shuffle to the new architecture and support remote storage as > > a supplement. The performance of this version is not the first priority, > > so we haven’t tested the performance of using only remote storage. > > If there are indeed regressions, we will keep optimizing the performance > > of the remote storages and improve it until only remote storage is > > available in the production environment. > > > > > 2. Shall we move the local data to remote storage if the producer is > > finished for a long time? > > > > I agree that it is a good idea, which can release task manager resources > > more timely. But moving data from TM local disk to remote storage needs > > more detailed discussion and design, and it is easier to implement it based > > on the new architecture. Considering the complexity, the target focus, and > > the iteration cycle of the FLIP, we decide that the details are not > > included > > in the first version. We will extend and implement them in the subsequent > > versions. > > > > Best, > > Yuxin > > > > > > Weihua Hu 于2023年3月9日周四 11:22写道: > > > > > Hi, Yuxin > > > > > > Thanks for driving this FLIP. > > > > > > The remote storage shuffle could improve the stability of Batch jobs. > > > > > > In our internal scenario, we use a hybrid cluster to run both > > > Streaming(high priority) > > > and Batch jobs(low priority). When there is not enough resources(such as > > > cpu usage > > > reaches a threshold), the batch containers will be evicted. So this will > > > cause some re-run > > > of batch tasks. > > > > > > It would be a great help if the remote storage could address this. So I > > > have a few questions. > > > > > > 1. How many performance regressions would there be if we only used remote > > > storage? > > > > > > 2. In current design, the shuffle data segment will write to one kind of > > > storage tier. > > > Shall we move the local data to remote storage if the producer is > > finished > > > for a long time? > > > So we can release the idle task manager with no shuffle data on it. This > > > may help to reduce > > > the resource usage when producer parallelism is larger than consume. > > > > > > Best, > > > Weihua > > > > > > > > > On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan > > wrote: > > > > > > > Hi, Junrui, > > > > Thanks for the suggestions and ideas. > > > > > > > > > If they are fixed, I suggest that FLIP could provide clearer > > > > explanations. > > > > I have updated the FLIP and described the segment size more clearly. > > > > > > > > > can we provide configuration options for users to manually adjust the > > > > sizes? > > > > The segment size can be configured if necessary. But considering that > > if > > > we > > > > exposed these parameters prematurely, it may be difficult to modify the > > > > implementation later because the user has already used the configs. We > > > > can make these internal configs or fixed values when implementing the > > > first > > > > version, I think we can use either of these two ways, because they are > > > > internal and do not affect the public APIs. > > > > > > > > Best, > > > > Yuxin > > > > > > > > > > > > Junrui Lee 于2023年3月8日周三 00:24写道: > > > > > > > > > Hi Yuxin, > > > > > > > > > > This FLIP looks quite reasonable. Flink can solve the problem of > > Batch > > > > > shuffle by > > > > > combining local and remote storage, and can use fixed local disks for > > > >
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Thanks Yuxin for your explanation. That sounds reasonable. Looking forward to the new shuffle. Best, Weihua On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan wrote: > Hi, Weihua, > Thanks for the questions and the ideas. > > > 1. How many performance regressions would there be if we only > used remote storage? > > The new architecture can support to use remote storage only, but this > FLIP target is to improve job stability. And the change in the FLIP has > been significantly complex and the goal of the first version is to update > Hybrid Shuffle to the new architecture and support remote storage as > a supplement. The performance of this version is not the first priority, > so we haven’t tested the performance of using only remote storage. > If there are indeed regressions, we will keep optimizing the performance > of the remote storages and improve it until only remote storage is > available in the production environment. > > > 2. Shall we move the local data to remote storage if the producer is > finished for a long time? > > I agree that it is a good idea, which can release task manager resources > more timely. But moving data from TM local disk to remote storage needs > more detailed discussion and design, and it is easier to implement it based > on the new architecture. Considering the complexity, the target focus, and > the iteration cycle of the FLIP, we decide that the details are not > included > in the first version. We will extend and implement them in the subsequent > versions. > > Best, > Yuxin > > > Weihua Hu 于2023年3月9日周四 11:22写道: > > > Hi, Yuxin > > > > Thanks for driving this FLIP. > > > > The remote storage shuffle could improve the stability of Batch jobs. > > > > In our internal scenario, we use a hybrid cluster to run both > > Streaming(high priority) > > and Batch jobs(low priority). When there is not enough resources(such as > > cpu usage > > reaches a threshold), the batch containers will be evicted. So this will > > cause some re-run > > of batch tasks. > > > > It would be a great help if the remote storage could address this. So I > > have a few questions. > > > > 1. How many performance regressions would there be if we only used remote > > storage? > > > > 2. In current design, the shuffle data segment will write to one kind of > > storage tier. > > Shall we move the local data to remote storage if the producer is > finished > > for a long time? > > So we can release the idle task manager with no shuffle data on it. This > > may help to reduce > > the resource usage when producer parallelism is larger than consume. > > > > Best, > > Weihua > > > > > > On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan > wrote: > > > > > Hi, Junrui, > > > Thanks for the suggestions and ideas. > > > > > > > If they are fixed, I suggest that FLIP could provide clearer > > > explanations. > > > I have updated the FLIP and described the segment size more clearly. > > > > > > > can we provide configuration options for users to manually adjust the > > > sizes? > > > The segment size can be configured if necessary. But considering that > if > > we > > > exposed these parameters prematurely, it may be difficult to modify the > > > implementation later because the user has already used the configs. We > > > can make these internal configs or fixed values when implementing the > > first > > > version, I think we can use either of these two ways, because they are > > > internal and do not affect the public APIs. > > > > > > Best, > > > Yuxin > > > > > > > > > Junrui Lee 于2023年3月8日周三 00:24写道: > > > > > > > Hi Yuxin, > > > > > > > > This FLIP looks quite reasonable. Flink can solve the problem of > Batch > > > > shuffle by > > > > combining local and remote storage, and can use fixed local disks for > > > > better performance > > > > in most scenarios, while using remote storage as a supplement when > > local > > > > disks are not > > > > sufficient, avoiding wasteful costs and poor job stability. > Moreover, > > > the > > > > solution also > > > > considers the issue of dynamic switching, which can automatically > > switch > > > to > > > > remote > > > > storage when the local disk is full, saving costs, and automatically > > > switch > > > > back when > > > > there is available space on the local disk. > > > > > > > > As Wencong Liu stated, an appropriate segment size is essential, as > it > > > can > > > > significantly > > > > affect shuffle performance. I also agree that the first version > should > > > > focus mainly on the > > > > design and implementation. However, I have a small question about > > FLIP. I > > > > did not see > > > > any information regarding the segment size of memory, local disk, and > > > > remote storage > > > > in this FLIP. Are these three values fixed at present? If they are > > > fixed, I > > > > suggest that FLIP > > > > could provide clearer explanations. Moreover, although a dynamic > > segment > > > > size > > > > mechanism is not necessary at the moment, can we provide >
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi, Weihua, Thanks for the questions and the ideas. > 1. How many performance regressions would there be if we only used remote storage? The new architecture can support to use remote storage only, but this FLIP target is to improve job stability. And the change in the FLIP has been significantly complex and the goal of the first version is to update Hybrid Shuffle to the new architecture and support remote storage as a supplement. The performance of this version is not the first priority, so we haven’t tested the performance of using only remote storage. If there are indeed regressions, we will keep optimizing the performance of the remote storages and improve it until only remote storage is available in the production environment. > 2. Shall we move the local data to remote storage if the producer is finished for a long time? I agree that it is a good idea, which can release task manager resources more timely. But moving data from TM local disk to remote storage needs more detailed discussion and design, and it is easier to implement it based on the new architecture. Considering the complexity, the target focus, and the iteration cycle of the FLIP, we decide that the details are not included in the first version. We will extend and implement them in the subsequent versions. Best, Yuxin Weihua Hu 于2023年3月9日周四 11:22写道: > Hi, Yuxin > > Thanks for driving this FLIP. > > The remote storage shuffle could improve the stability of Batch jobs. > > In our internal scenario, we use a hybrid cluster to run both > Streaming(high priority) > and Batch jobs(low priority). When there is not enough resources(such as > cpu usage > reaches a threshold), the batch containers will be evicted. So this will > cause some re-run > of batch tasks. > > It would be a great help if the remote storage could address this. So I > have a few questions. > > 1. How many performance regressions would there be if we only used remote > storage? > > 2. In current design, the shuffle data segment will write to one kind of > storage tier. > Shall we move the local data to remote storage if the producer is finished > for a long time? > So we can release the idle task manager with no shuffle data on it. This > may help to reduce > the resource usage when producer parallelism is larger than consume. > > Best, > Weihua > > > On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan wrote: > > > Hi, Junrui, > > Thanks for the suggestions and ideas. > > > > > If they are fixed, I suggest that FLIP could provide clearer > > explanations. > > I have updated the FLIP and described the segment size more clearly. > > > > > can we provide configuration options for users to manually adjust the > > sizes? > > The segment size can be configured if necessary. But considering that if > we > > exposed these parameters prematurely, it may be difficult to modify the > > implementation later because the user has already used the configs. We > > can make these internal configs or fixed values when implementing the > first > > version, I think we can use either of these two ways, because they are > > internal and do not affect the public APIs. > > > > Best, > > Yuxin > > > > > > Junrui Lee 于2023年3月8日周三 00:24写道: > > > > > Hi Yuxin, > > > > > > This FLIP looks quite reasonable. Flink can solve the problem of Batch > > > shuffle by > > > combining local and remote storage, and can use fixed local disks for > > > better performance > > > in most scenarios, while using remote storage as a supplement when > local > > > disks are not > > > sufficient, avoiding wasteful costs and poor job stability. Moreover, > > the > > > solution also > > > considers the issue of dynamic switching, which can automatically > switch > > to > > > remote > > > storage when the local disk is full, saving costs, and automatically > > switch > > > back when > > > there is available space on the local disk. > > > > > > As Wencong Liu stated, an appropriate segment size is essential, as it > > can > > > significantly > > > affect shuffle performance. I also agree that the first version should > > > focus mainly on the > > > design and implementation. However, I have a small question about > FLIP. I > > > did not see > > > any information regarding the segment size of memory, local disk, and > > > remote storage > > > in this FLIP. Are these three values fixed at present? If they are > > fixed, I > > > suggest that FLIP > > > could provide clearer explanations. Moreover, although a dynamic > segment > > > size > > > mechanism is not necessary at the moment, can we provide configuration > > > options for users > > > to manually adjust these sizes? I think it might be useful. > > > > > > Best, > > > Junrui. > > > > > > Yuxin Tan 于2023年3月7日周二 20:14写道: > > > > > > > Thanks for joining the discussion. > > > > > > > > @weijie guo > > > > > 1. How to optimize the broadcast result partition? > > > > For the partitions with multi-consumers, e.g., broadcast result > > > partition, > > > > partition reuse, > >
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi, Yuxin Thanks for driving this FLIP. The remote storage shuffle could improve the stability of Batch jobs. In our internal scenario, we use a hybrid cluster to run both Streaming(high priority) and Batch jobs(low priority). When there is not enough resources(such as cpu usage reaches a threshold), the batch containers will be evicted. So this will cause some re-run of batch tasks. It would be a great help if the remote storage could address this. So I have a few questions. 1. How many performance regressions would there be if we only used remote storage? 2. In current design, the shuffle data segment will write to one kind of storage tier. Shall we move the local data to remote storage if the producer is finished for a long time? So we can release the idle task manager with no shuffle data on it. This may help to reduce the resource usage when producer parallelism is larger than consume. Best, Weihua On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan wrote: > Hi, Junrui, > Thanks for the suggestions and ideas. > > > If they are fixed, I suggest that FLIP could provide clearer > explanations. > I have updated the FLIP and described the segment size more clearly. > > > can we provide configuration options for users to manually adjust the > sizes? > The segment size can be configured if necessary. But considering that if we > exposed these parameters prematurely, it may be difficult to modify the > implementation later because the user has already used the configs. We > can make these internal configs or fixed values when implementing the first > version, I think we can use either of these two ways, because they are > internal and do not affect the public APIs. > > Best, > Yuxin > > > Junrui Lee 于2023年3月8日周三 00:24写道: > > > Hi Yuxin, > > > > This FLIP looks quite reasonable. Flink can solve the problem of Batch > > shuffle by > > combining local and remote storage, and can use fixed local disks for > > better performance > > in most scenarios, while using remote storage as a supplement when local > > disks are not > > sufficient, avoiding wasteful costs and poor job stability. Moreover, > the > > solution also > > considers the issue of dynamic switching, which can automatically switch > to > > remote > > storage when the local disk is full, saving costs, and automatically > switch > > back when > > there is available space on the local disk. > > > > As Wencong Liu stated, an appropriate segment size is essential, as it > can > > significantly > > affect shuffle performance. I also agree that the first version should > > focus mainly on the > > design and implementation. However, I have a small question about FLIP. I > > did not see > > any information regarding the segment size of memory, local disk, and > > remote storage > > in this FLIP. Are these three values fixed at present? If they are > fixed, I > > suggest that FLIP > > could provide clearer explanations. Moreover, although a dynamic segment > > size > > mechanism is not necessary at the moment, can we provide configuration > > options for users > > to manually adjust these sizes? I think it might be useful. > > > > Best, > > Junrui. > > > > Yuxin Tan 于2023年3月7日周二 20:14写道: > > > > > Thanks for joining the discussion. > > > > > > @weijie guo > > > > 1. How to optimize the broadcast result partition? > > > For the partitions with multi-consumers, e.g., broadcast result > > partition, > > > partition reuse, > > > speculative, etc, the processing logic is the same as the original > Hybrid > > > Shuffle, that is, > > > using the full spilling strategy. It indeed may reduce the opportunity > to > > > consume from > > > memory, but the PoC shows that it has no effect on the performance > > > basically. > > > > > > > 2. Can the new proposal completely avoid this problem of inaccurate > > > backlog > > > calculation? > > > Yes, this can avoid the problem completely. About the read buffers, > the N > > > is to reserve > > > one exclusive buffer per channel, which is to avoid the deadlock > because > > > the buffers > > > are acquired by some channels and other channels can not request any > > > buffers. But > > > the buffers except for the N can be floating (competing to request the > > > buffers) by all > > > channels. > > > > > > @Wencong Liu > > > > Deciding the Segment size dynamically will be helpful. > > > I agree that it may be better if the segment size is dynamically > decided, > > > but for simplifying > > > the implementation of the first version, we want to make this a fixed > > value > > > for each tier. > > > In the future, this can be a good improvement if necessary. In the > first > > > version, we will mainly > > > focus on the more important features, such as the tiered storage > > > architecture, dynamic > > > switching tiers, supporting remote storage, memory management, etc. > > > > > > Best, > > > Yuxin > > > > > > > > > Wencong Liu 于2023年3月7日周二 16:48写道: > > > > > > > Hello Yuxin, > > > > > > > > > > > > Thanks for your
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi, Junrui, Thanks for the suggestions and ideas. > If they are fixed, I suggest that FLIP could provide clearer explanations. I have updated the FLIP and described the segment size more clearly. > can we provide configuration options for users to manually adjust the sizes? The segment size can be configured if necessary. But considering that if we exposed these parameters prematurely, it may be difficult to modify the implementation later because the user has already used the configs. We can make these internal configs or fixed values when implementing the first version, I think we can use either of these two ways, because they are internal and do not affect the public APIs. Best, Yuxin Junrui Lee 于2023年3月8日周三 00:24写道: > Hi Yuxin, > > This FLIP looks quite reasonable. Flink can solve the problem of Batch > shuffle by > combining local and remote storage, and can use fixed local disks for > better performance > in most scenarios, while using remote storage as a supplement when local > disks are not > sufficient, avoiding wasteful costs and poor job stability. Moreover, the > solution also > considers the issue of dynamic switching, which can automatically switch to > remote > storage when the local disk is full, saving costs, and automatically switch > back when > there is available space on the local disk. > > As Wencong Liu stated, an appropriate segment size is essential, as it can > significantly > affect shuffle performance. I also agree that the first version should > focus mainly on the > design and implementation. However, I have a small question about FLIP. I > did not see > any information regarding the segment size of memory, local disk, and > remote storage > in this FLIP. Are these three values fixed at present? If they are fixed, I > suggest that FLIP > could provide clearer explanations. Moreover, although a dynamic segment > size > mechanism is not necessary at the moment, can we provide configuration > options for users > to manually adjust these sizes? I think it might be useful. > > Best, > Junrui. > > Yuxin Tan 于2023年3月7日周二 20:14写道: > > > Thanks for joining the discussion. > > > > @weijie guo > > > 1. How to optimize the broadcast result partition? > > For the partitions with multi-consumers, e.g., broadcast result > partition, > > partition reuse, > > speculative, etc, the processing logic is the same as the original Hybrid > > Shuffle, that is, > > using the full spilling strategy. It indeed may reduce the opportunity to > > consume from > > memory, but the PoC shows that it has no effect on the performance > > basically. > > > > > 2. Can the new proposal completely avoid this problem of inaccurate > > backlog > > calculation? > > Yes, this can avoid the problem completely. About the read buffers, the N > > is to reserve > > one exclusive buffer per channel, which is to avoid the deadlock because > > the buffers > > are acquired by some channels and other channels can not request any > > buffers. But > > the buffers except for the N can be floating (competing to request the > > buffers) by all > > channels. > > > > @Wencong Liu > > > Deciding the Segment size dynamically will be helpful. > > I agree that it may be better if the segment size is dynamically decided, > > but for simplifying > > the implementation of the first version, we want to make this a fixed > value > > for each tier. > > In the future, this can be a good improvement if necessary. In the first > > version, we will mainly > > focus on the more important features, such as the tiered storage > > architecture, dynamic > > switching tiers, supporting remote storage, memory management, etc. > > > > Best, > > Yuxin > > > > > > Wencong Liu 于2023年3月7日周二 16:48写道: > > > > > Hello Yuxin, > > > > > > > > > Thanks for your proposal! Adding remote storage capability to > Flink's > > > Hybrid Shuffle is a significant improvement that addresses the issue of > > > local disk storage limitations. This enhancement not only ensures > > > uninterrupted Shuffle, but also enables Flink to handle larger > workloads > > > and more complex data processing tasks. With the ability to seamlessly > > > shift between local and remote storage, Flink's Hybrid Shuffle will be > > more > > > versatile and scalable, making it an ideal choice for organizations > > looking > > > to build distributed data processing applications with ease. > > > Besides, I've a small question about the size of Segment in > different > > > storages. According to the FLIP, the size of Segment may be fixed for > > each > > > Storage Tier, but I think the fixed size may affect the shuffle > > > performance. For example, smaller segment size will improve the > > utilization > > > rate of Memory Storage Tier, but it may brings extra cost to Disk > Storage > > > Tier or Remote Storage Tier. Deciding the size of Segment dynamicly > will > > be > > > helpful. > > > > > > Best, > > > > > > > > > Wencong Liu > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi Yuxin, This FLIP looks quite reasonable. Flink can solve the problem of Batch shuffle by combining local and remote storage, and can use fixed local disks for better performance in most scenarios, while using remote storage as a supplement when local disks are not sufficient, avoiding wasteful costs and poor job stability. Moreover, the solution also considers the issue of dynamic switching, which can automatically switch to remote storage when the local disk is full, saving costs, and automatically switch back when there is available space on the local disk. As Wencong Liu stated, an appropriate segment size is essential, as it can significantly affect shuffle performance. I also agree that the first version should focus mainly on the design and implementation. However, I have a small question about FLIP. I did not see any information regarding the segment size of memory, local disk, and remote storage in this FLIP. Are these three values fixed at present? If they are fixed, I suggest that FLIP could provide clearer explanations. Moreover, although a dynamic segment size mechanism is not necessary at the moment, can we provide configuration options for users to manually adjust these sizes? I think it might be useful. Best, Junrui. Yuxin Tan 于2023年3月7日周二 20:14写道: > Thanks for joining the discussion. > > @weijie guo > > 1. How to optimize the broadcast result partition? > For the partitions with multi-consumers, e.g., broadcast result partition, > partition reuse, > speculative, etc, the processing logic is the same as the original Hybrid > Shuffle, that is, > using the full spilling strategy. It indeed may reduce the opportunity to > consume from > memory, but the PoC shows that it has no effect on the performance > basically. > > > 2. Can the new proposal completely avoid this problem of inaccurate > backlog > calculation? > Yes, this can avoid the problem completely. About the read buffers, the N > is to reserve > one exclusive buffer per channel, which is to avoid the deadlock because > the buffers > are acquired by some channels and other channels can not request any > buffers. But > the buffers except for the N can be floating (competing to request the > buffers) by all > channels. > > @Wencong Liu > > Deciding the Segment size dynamically will be helpful. > I agree that it may be better if the segment size is dynamically decided, > but for simplifying > the implementation of the first version, we want to make this a fixed value > for each tier. > In the future, this can be a good improvement if necessary. In the first > version, we will mainly > focus on the more important features, such as the tiered storage > architecture, dynamic > switching tiers, supporting remote storage, memory management, etc. > > Best, > Yuxin > > > Wencong Liu 于2023年3月7日周二 16:48写道: > > > Hello Yuxin, > > > > > > Thanks for your proposal! Adding remote storage capability to Flink's > > Hybrid Shuffle is a significant improvement that addresses the issue of > > local disk storage limitations. This enhancement not only ensures > > uninterrupted Shuffle, but also enables Flink to handle larger workloads > > and more complex data processing tasks. With the ability to seamlessly > > shift between local and remote storage, Flink's Hybrid Shuffle will be > more > > versatile and scalable, making it an ideal choice for organizations > looking > > to build distributed data processing applications with ease. > > Besides, I've a small question about the size of Segment in different > > storages. According to the FLIP, the size of Segment may be fixed for > each > > Storage Tier, but I think the fixed size may affect the shuffle > > performance. For example, smaller segment size will improve the > utilization > > rate of Memory Storage Tier, but it may brings extra cost to Disk Storage > > Tier or Remote Storage Tier. Deciding the size of Segment dynamicly will > be > > helpful. > > > > Best, > > > > > > Wencong Liu > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > At 2023-03-06 13:51:21, "Yuxin Tan" wrote: > > >Hi everyone, > > > > > >I would like to start a discussion on FLIP-301: Hybrid Shuffle supports > > >Remote Storage[1]. > > > > > >In the cloud-native environment, it is difficult to determine the > > >appropriate > > >disk space for Batch shuffle, which will affect job stability. > > > > > >This FLIP is to support Remote Storage for Hybrid Shuffle to improve the > > >Batch job stability in the cloud-native environment. > > > > > >The goals of this FLIP are as follows. > > >1. By default, use the local memory and disk to ensure high shuffle > > >performance if the local storage space is sufficient. > > >2. When the local storage space is insufficient, use remote storage as > > >a supplement to avoid large-scale Batch job failure. > > > > > >Looking forward to hearing from you. > > > > > >[1] > > > > > >
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Thanks for joining the discussion. @weijie guo > 1. How to optimize the broadcast result partition? For the partitions with multi-consumers, e.g., broadcast result partition, partition reuse, speculative, etc, the processing logic is the same as the original Hybrid Shuffle, that is, using the full spilling strategy. It indeed may reduce the opportunity to consume from memory, but the PoC shows that it has no effect on the performance basically. > 2. Can the new proposal completely avoid this problem of inaccurate backlog calculation? Yes, this can avoid the problem completely. About the read buffers, the N is to reserve one exclusive buffer per channel, which is to avoid the deadlock because the buffers are acquired by some channels and other channels can not request any buffers. But the buffers except for the N can be floating (competing to request the buffers) by all channels. @Wencong Liu > Deciding the Segment size dynamically will be helpful. I agree that it may be better if the segment size is dynamically decided, but for simplifying the implementation of the first version, we want to make this a fixed value for each tier. In the future, this can be a good improvement if necessary. In the first version, we will mainly focus on the more important features, such as the tiered storage architecture, dynamic switching tiers, supporting remote storage, memory management, etc. Best, Yuxin Wencong Liu 于2023年3月7日周二 16:48写道: > Hello Yuxin, > > > Thanks for your proposal! Adding remote storage capability to Flink's > Hybrid Shuffle is a significant improvement that addresses the issue of > local disk storage limitations. This enhancement not only ensures > uninterrupted Shuffle, but also enables Flink to handle larger workloads > and more complex data processing tasks. With the ability to seamlessly > shift between local and remote storage, Flink's Hybrid Shuffle will be more > versatile and scalable, making it an ideal choice for organizations looking > to build distributed data processing applications with ease. > Besides, I've a small question about the size of Segment in different > storages. According to the FLIP, the size of Segment may be fixed for each > Storage Tier, but I think the fixed size may affect the shuffle > performance. For example, smaller segment size will improve the utilization > rate of Memory Storage Tier, but it may brings extra cost to Disk Storage > Tier or Remote Storage Tier. Deciding the size of Segment dynamicly will be > helpful. > > Best, > > > Wencong Liu > > > > > > > > > > > > > > > > > > > > At 2023-03-06 13:51:21, "Yuxin Tan" wrote: > >Hi everyone, > > > >I would like to start a discussion on FLIP-301: Hybrid Shuffle supports > >Remote Storage[1]. > > > >In the cloud-native environment, it is difficult to determine the > >appropriate > >disk space for Batch shuffle, which will affect job stability. > > > >This FLIP is to support Remote Storage for Hybrid Shuffle to improve the > >Batch job stability in the cloud-native environment. > > > >The goals of this FLIP are as follows. > >1. By default, use the local memory and disk to ensure high shuffle > >performance if the local storage space is sufficient. > >2. When the local storage space is insufficient, use remote storage as > >a supplement to avoid large-scale Batch job failure. > > > >Looking forward to hearing from you. > > > >[1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage > > > >Best, > >Yuxin >
Re:[DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hello Yuxin, Thanks for your proposal! Adding remote storage capability to Flink's Hybrid Shuffle is a significant improvement that addresses the issue of local disk storage limitations. This enhancement not only ensures uninterrupted Shuffle, but also enables Flink to handle larger workloads and more complex data processing tasks. With the ability to seamlessly shift between local and remote storage, Flink's Hybrid Shuffle will be more versatile and scalable, making it an ideal choice for organizations looking to build distributed data processing applications with ease. Besides, I've a small question about the size of Segment in different storages. According to the FLIP, the size of Segment may be fixed for each Storage Tier, but I think the fixed size may affect the shuffle performance. For example, smaller segment size will improve the utilization rate of Memory Storage Tier, but it may brings extra cost to Disk Storage Tier or Remote Storage Tier. Deciding the size of Segment dynamicly will be helpful. Best, Wencong Liu At 2023-03-06 13:51:21, "Yuxin Tan" wrote: >Hi everyone, > >I would like to start a discussion on FLIP-301: Hybrid Shuffle supports >Remote Storage[1]. > >In the cloud-native environment, it is difficult to determine the >appropriate >disk space for Batch shuffle, which will affect job stability. > >This FLIP is to support Remote Storage for Hybrid Shuffle to improve the >Batch job stability in the cloud-native environment. > >The goals of this FLIP are as follows. >1. By default, use the local memory and disk to ensure high shuffle >performance if the local storage space is sufficient. >2. When the local storage space is insufficient, use remote storage as >a supplement to avoid large-scale Batch job failure. > >Looking forward to hearing from you. > >[1] >https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage > >Best, >Yuxin
Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi Yuxin, As one of the main contributors of hybrid shuffle, I am very happy to see this improvement. IMO, hybrid shuffle should gradually replace the default batch shuffle implementation, because it is theoretically a superset of blocking shuffle(i.e. SortMergeShuffle / BoundedBlockingShuffle). Today, with the popularity of K8S, more and more people suffer from insufficient disk space problem when running batch jobs on cloud environment. Therefore, supporting elastic remote storage is a crucial part of this process. This FLIP is good for me on the whole, but I still have some problems: 1. How to optimize the broadcast result partition? Is it the same as the original hybrid shuffle that uses the full spilling strategy? IIUC, this FLIP proposes to remove the spilling operation from the memory layer to the disk layer, does it also lose the ability to consume data from memory in this case? This problem can be extended to any multi-consumer situation, such as partition reuse and speculative execution. 2. With regard to the number of reader-side memory management, the original hybrid shuffle did not calculate an accurate backlog for performance reasons, so it is necessary to reserve two exclusive buffers for each channel to avoid some performance deterioration. Can the new proposal completely avoid this problem? At the same time, does N in the formula mean that one exclusive buffer is reserved for each channel? If so, why can't all of them floating? Best regards, Weijie Yuxin Tan 于2023年3月6日周一 13:51写道: > Hi everyone, > > I would like to start a discussion on FLIP-301: Hybrid Shuffle supports > Remote Storage[1]. > > In the cloud-native environment, it is difficult to determine the > appropriate > disk space for Batch shuffle, which will affect job stability. > > This FLIP is to support Remote Storage for Hybrid Shuffle to improve the > Batch job stability in the cloud-native environment. > > The goals of this FLIP are as follows. > 1. By default, use the local memory and disk to ensure high shuffle > performance if the local storage space is sufficient. > 2. When the local storage space is insufficient, use remote storage as > a supplement to avoid large-scale Batch job failure. > > Looking forward to hearing from you. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage > > Best, > Yuxin >
[DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
Hi everyone, I would like to start a discussion on FLIP-301: Hybrid Shuffle supports Remote Storage[1]. In the cloud-native environment, it is difficult to determine the appropriate disk space for Batch shuffle, which will affect job stability. This FLIP is to support Remote Storage for Hybrid Shuffle to improve the Batch job stability in the cloud-native environment. The goals of this FLIP are as follows. 1. By default, use the local memory and disk to ensure high shuffle performance if the local storage space is sufficient. 2. When the local storage space is insufficient, use remote storage as a supplement to avoid large-scale Batch job failure. Looking forward to hearing from you. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage Best, Yuxin