Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-19 Thread Yuxin Tan
Hi all,

Thanks for all the feedback and suggestions so far.

The discussion has been going on for some time. If there are no
further comments, we will start voting today.

Best,
Yuxin


Yuxin Tan  于2023年3月17日周五 12:52写道:

> Hi,
> Thanks for joining the discussion and giving the ideas.
>
> @ron
> > can the Hybrid Shuffle replace the RSS in the future?
>
> The hybrid shuffle and RSS offer distinct solutions to address
> the shuffle operation challenge. To optimize performance, we
> store shuffle data in different tiers of memory and disk, enabling
> greater flexibility and ease of use. Specifically, we cache
> intermediate data in memory to reduce disk I/O overhead.
> In contrast, RSS is a standalone service that can operate across
> multiple servers within a cluster, parallelizing shuffle operations
> to enhance performance. However, this introduces additional
> deployment and maintenance costs. Each approach has its own
> benefits and drawbacks, and users should be able to select the
> method that best suits their needs. So I think we cannot replace
> RSS in the future.
>
> @ConradJam
> > Should we define a data acceleration layer like Alluxio in remote
> storage?
>
> I'm not entirely clear on the detailed plan you've proposed, but I
> understand that you want to use Alluxio to serve as a cache layer for
> the remote stoarge tier. It's designed to provide low-latency data
> access to applications through a distributed caching layer. However,
> implementing Alluxio introduces additional dependencies and
> deployment/maintenance costs for users. While our design approach
> is to supplement local storage with remote storage, as local storage
> is generally sufficient. Given the limited usage scenarios, introducing
> such costs for optimization may not be worthwhile or meaningful.
> Additionally, for users, added dependencies imply increased complexity.
>
> Best,
> Yuxin
>
>
> ConradJam  于2023年3月17日周五 11:11写道:
>
>> Thanks for your start this discuss
>>
>>
>> Here I am a bit confused about the memory layer definition. This refers to
>> local memory. Should we define a data acceleration layer like Alluxio [1]
>> in remote storage?
>>
>>
>> Let me cite a scenario: If I use Fluid [2] to mount an AlluxioRuntime [3]
>> on K8S, it looks like a local disk (but it is actually a remote memory
>> storage), Have we specified this behavior or optimized it for this
>> scenario?
>>
>>
>> [1]  What is alluxio :
>> https://docs.alluxio.io/os/user/stable/en/Overview.html
>>
>> [2]  Fluid: https://fluid-cloudnative.github.io/
>>
>> [3]  Fluid Alluxio Runtime:
>>
>> https://fluid-cloudnative.github.io/samples/tieredstore_config.html#prerequisites
>>
>> liu ron  于2023年3月17日周五 10:39写道:
>>
>> > Hi, Yuxin,
>> >
>> > Thanks for creating this FLIP. Adding remote storage capability to
>> Flink's
>> > Hybrid Shuffle is a significant improvement that addresses the issue of
>> > local disk storage limitations, this also can improve the stability of
>> > Flink Batch Job.
>> > I just have one question: can the Hybrid Shuffle replace the RSS in the
>> > future? Due to the Hybrid Shuffle having remote storage ability, I think
>> > maybe we don't need to maintain a standalone RSS, it will simplify our
>> > operation work.
>> >
>>
>>
>> --
>> Best
>>
>> ConradJam
>>
>


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-16 Thread Yuxin Tan
Hi,
Thanks for joining the discussion and giving the ideas.

@ron
> can the Hybrid Shuffle replace the RSS in the future?

The hybrid shuffle and RSS offer distinct solutions to address
the shuffle operation challenge. To optimize performance, we
store shuffle data in different tiers of memory and disk, enabling
greater flexibility and ease of use. Specifically, we cache
intermediate data in memory to reduce disk I/O overhead.
In contrast, RSS is a standalone service that can operate across
multiple servers within a cluster, parallelizing shuffle operations
to enhance performance. However, this introduces additional
deployment and maintenance costs. Each approach has its own
benefits and drawbacks, and users should be able to select the
method that best suits their needs. So I think we cannot replace
RSS in the future.

@ConradJam
> Should we define a data acceleration layer like Alluxio in remote storage?

I'm not entirely clear on the detailed plan you've proposed, but I
understand that you want to use Alluxio to serve as a cache layer for
the remote stoarge tier. It's designed to provide low-latency data
access to applications through a distributed caching layer. However,
implementing Alluxio introduces additional dependencies and
deployment/maintenance costs for users. While our design approach
is to supplement local storage with remote storage, as local storage
is generally sufficient. Given the limited usage scenarios, introducing
such costs for optimization may not be worthwhile or meaningful.
Additionally, for users, added dependencies imply increased complexity.

Best,
Yuxin


ConradJam  于2023年3月17日周五 11:11写道:

> Thanks for your start this discuss
>
>
> Here I am a bit confused about the memory layer definition. This refers to
> local memory. Should we define a data acceleration layer like Alluxio [1]
> in remote storage?
>
>
> Let me cite a scenario: If I use Fluid [2] to mount an AlluxioRuntime [3]
> on K8S, it looks like a local disk (but it is actually a remote memory
> storage), Have we specified this behavior or optimized it for this
> scenario?
>
>
> [1]  What is alluxio :
> https://docs.alluxio.io/os/user/stable/en/Overview.html
>
> [2]  Fluid: https://fluid-cloudnative.github.io/
>
> [3]  Fluid Alluxio Runtime:
>
> https://fluid-cloudnative.github.io/samples/tieredstore_config.html#prerequisites
>
> liu ron  于2023年3月17日周五 10:39写道:
>
> > Hi, Yuxin,
> >
> > Thanks for creating this FLIP. Adding remote storage capability to
> Flink's
> > Hybrid Shuffle is a significant improvement that addresses the issue of
> > local disk storage limitations, this also can improve the stability of
> > Flink Batch Job.
> > I just have one question: can the Hybrid Shuffle replace the RSS in the
> > future? Due to the Hybrid Shuffle having remote storage ability, I think
> > maybe we don't need to maintain a standalone RSS, it will simplify our
> > operation work.
> >
>
>
> --
> Best
>
> ConradJam
>


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-16 Thread ConradJam
Thanks for your start this discuss


Here I am a bit confused about the memory layer definition. This refers to
local memory. Should we define a data acceleration layer like Alluxio [1]
in remote storage?


Let me cite a scenario: If I use Fluid [2] to mount an AlluxioRuntime [3]
on K8S, it looks like a local disk (but it is actually a remote memory
storage), Have we specified this behavior or optimized it for this scenario?


[1]  What is alluxio :
https://docs.alluxio.io/os/user/stable/en/Overview.html

[2]  Fluid: https://fluid-cloudnative.github.io/

[3]  Fluid Alluxio Runtime:
https://fluid-cloudnative.github.io/samples/tieredstore_config.html#prerequisites

liu ron  于2023年3月17日周五 10:39写道:

> Hi, Yuxin,
>
> Thanks for creating this FLIP. Adding remote storage capability to Flink's
> Hybrid Shuffle is a significant improvement that addresses the issue of
> local disk storage limitations, this also can improve the stability of
> Flink Batch Job.
> I just have one question: can the Hybrid Shuffle replace the RSS in the
> future? Due to the Hybrid Shuffle having remote storage ability, I think
> maybe we don't need to maintain a standalone RSS, it will simplify our
> operation work.
>


-- 
Best

ConradJam


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-16 Thread liu ron
Hi, Yuxin,

Thanks for creating this FLIP. Adding remote storage capability to Flink's
Hybrid Shuffle is a significant improvement that addresses the issue of
local disk storage limitations, this also can improve the stability of
Flink Batch Job.
I just have one question: can the Hybrid Shuffle replace the RSS in the
future? Due to the Hybrid Shuffle having remote storage ability, I think
maybe we don't need to maintain a standalone RSS, it will simplify our
operation work.


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-15 Thread Yuxin Tan
Hi, Yun

Thanks for your suggestions.

> I think you could describe it explicitly in the original FLIP's goals or
design principles.

I have updated the FLIP and given a more detailed description of the
tier decoupling design.

Best,
Yuxin


Yun Tang  于2023年3月15日周三 20:46写道:

> Hi Yuxin,
>
> Thanks for your explanations.
> I think the kernel idea is that you prefer the simple and decoupling
> design for the 1st version of hybrid shuffle with remote storage. If
> following this idea, perhaps I could accept your current explanations and I
> think you could describe it explicitly in the original FLIP's goals or
> design principles.
>
>
> Best
> Yun Tang
> 
> From: Yuxin Tan 
> Sent: Wednesday, March 15, 2023 12:41
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> Hi, Yun,
>
> Thanks for sharing the ideas.
>
> > 1. We should trigger to kick the shuffle data to remote storage
> once either condition is reached
>
> I believe that configuring two options in this manner is a pragmatic
> approach that can fulfill a wider range of usage scenarios. However,
> if we present two options, it may become difficult to remove them
> in the future once users have started relying on them. On the other
> hand, if we introduce a single option, we can easily incorporate
> additional options based on your recommendations if required.
> Thus, we recommend adopting a one-option solution in the first
> version to address the issue.
>
> > 2. Perhaps we could switch to kick shuffle data to remote storage
> once no space left exception is met.
>
> Thanks for your valuable feedback. While the suggestion is a viable
> solution to address the no space left exception issue, we are
> concerned that implementing it could create interdependence between
> the disk tier and remote storage tier, which would contradict our goal
> of achieving independence between tiers in the new architecture.
> Moreover, we believe that it is better to prevent encountering the
> exception in the first place by reserving adequate disk space. This is
> because other processes on the same machine may also be impacted
> when the exception occurs. If the exception does still arise, we can
> explore other potential solutions through detailed design and
> discussions, such as the one you proposed, optimizing the reserved
> space with a global counter of TM, etc. Although the current implementation
> only partially addresses the exception issue, we expect to improve it
> in subsequent versions due to the complexity of this FLIP. We would
> appreciate hearing your thoughts on this matter.
>
> Best,
> Yuxin
>
>
> Yun Tang  于2023年3月14日周二 14:48写道:
>
> > Hi Yuxin
> >
> > Thanks for your reply.
> > I am not saying that we should use an absolute reserved value to replace
> > the current plan of the reserved fraction. We should trigger to kick the
> > shuffle data to remote storage once either condition is reached.
> > Maybe we could get some idea from the configuration of tiered based state
> > backend [1].
> >
> > The concern about concurrent writing is due to the local disk being
> shared
> > by all the instances running on that node, we cannot ensure other
> > components would not flush data during shuffle writing. Perhaps we could
> > switch to kick shuffle data to remote storage once no space left
> exception
> > is met.
> >
> >
> > [1]
> >
> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo
> >
> >
> > Best
> > Yun Tang
> >
> > 
> > From: Yuxin Tan 
> > Sent: Monday, March 13, 2023 15:06
> > To: dev@flink.apache.org 
> > Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
> >
> > Hi,
> > Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang.
> >
> > @Zhu Zhu
> > > 1. I'm a bit concerned about the cost of it.
> >
> > The Read Client's request to check the existence of a segment in each
> > storage tier is the netty message, which is similar to the credit
> updating
> > messages. It is observed that the netty message cost for these operations
> > is relatively low and the number of messages is also relatively low
> > compared to credit updates which are sent every few buffers. Moreover,
> > since this request involves only memory operations, the message cost
> > is significantly low during the total reading data process.
> >
> > And we will further optimize the message 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-15 Thread Yun Tang
Hi Yuxin,

Thanks for your explanations.
I think the kernel idea is that you prefer the simple and decoupling design for 
the 1st version of hybrid shuffle with remote storage. If following this idea, 
perhaps I could accept your current explanations and I think you could describe 
it explicitly in the original FLIP's goals or design principles.


Best
Yun Tang

From: Yuxin Tan 
Sent: Wednesday, March 15, 2023 12:41
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

Hi, Yun,

Thanks for sharing the ideas.

> 1. We should trigger to kick the shuffle data to remote storage
once either condition is reached

I believe that configuring two options in this manner is a pragmatic
approach that can fulfill a wider range of usage scenarios. However,
if we present two options, it may become difficult to remove them
in the future once users have started relying on them. On the other
hand, if we introduce a single option, we can easily incorporate
additional options based on your recommendations if required.
Thus, we recommend adopting a one-option solution in the first
version to address the issue.

> 2. Perhaps we could switch to kick shuffle data to remote storage
once no space left exception is met.

Thanks for your valuable feedback. While the suggestion is a viable
solution to address the no space left exception issue, we are
concerned that implementing it could create interdependence between
the disk tier and remote storage tier, which would contradict our goal
of achieving independence between tiers in the new architecture.
Moreover, we believe that it is better to prevent encountering the
exception in the first place by reserving adequate disk space. This is
because other processes on the same machine may also be impacted
when the exception occurs. If the exception does still arise, we can
explore other potential solutions through detailed design and
discussions, such as the one you proposed, optimizing the reserved
space with a global counter of TM, etc. Although the current implementation
only partially addresses the exception issue, we expect to improve it
in subsequent versions due to the complexity of this FLIP. We would
appreciate hearing your thoughts on this matter.

Best,
Yuxin


Yun Tang  于2023年3月14日周二 14:48写道:

> Hi Yuxin
>
> Thanks for your reply.
> I am not saying that we should use an absolute reserved value to replace
> the current plan of the reserved fraction. We should trigger to kick the
> shuffle data to remote storage once either condition is reached.
> Maybe we could get some idea from the configuration of tiered based state
> backend [1].
>
> The concern about concurrent writing is due to the local disk being shared
> by all the instances running on that node, we cannot ensure other
> components would not flush data during shuffle writing. Perhaps we could
> switch to kick shuffle data to remote storage once no space left exception
> is met.
>
>
> [1]
> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo
>
>
> Best
> Yun Tang
>
> 
> From: Yuxin Tan 
> Sent: Monday, March 13, 2023 15:06
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> Hi,
> Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang.
>
> @Zhu Zhu
> > 1. I'm a bit concerned about the cost of it.
>
> The Read Client's request to check the existence of a segment in each
> storage tier is the netty message, which is similar to the credit updating
> messages. It is observed that the netty message cost for these operations
> is relatively low and the number of messages is also relatively low
> compared to credit updates which are sent every few buffers. Moreover,
> since this request involves only memory operations, the message cost
> is significantly low during the total reading data process.
>
> And we will further optimize the message cost later, particularly for
> segments that remain in the same tier without switching tiers. That
> is, for consecutive segments in the same tier, we can continue to send
> the next segment without waiting for the downstream to ask whether the
> segment exists in this tier.
>
>
> @Xia Sun
> > 1. how to keep the order of segments between different storage tiers
>
> To indicate the sequential order of upstream segments, we rely on segmentId
> downstream. Once segment n has been fully consumed by the downstream,
> the subsequent segment n + 1 will be asked in its natural order. As for the
> order of the buffers within each segment, they follow the default ordering
> mechanisms of Flink.
>
> > 2. what will happen if the disk space is found to 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-14 Thread Yuxin Tan
Hi, Yun,

Thanks for sharing the ideas.

> 1. We should trigger to kick the shuffle data to remote storage
once either condition is reached

I believe that configuring two options in this manner is a pragmatic
approach that can fulfill a wider range of usage scenarios. However,
if we present two options, it may become difficult to remove them
in the future once users have started relying on them. On the other
hand, if we introduce a single option, we can easily incorporate
additional options based on your recommendations if required.
Thus, we recommend adopting a one-option solution in the first
version to address the issue.

> 2. Perhaps we could switch to kick shuffle data to remote storage
once no space left exception is met.

Thanks for your valuable feedback. While the suggestion is a viable
solution to address the no space left exception issue, we are
concerned that implementing it could create interdependence between
the disk tier and remote storage tier, which would contradict our goal
of achieving independence between tiers in the new architecture.
Moreover, we believe that it is better to prevent encountering the
exception in the first place by reserving adequate disk space. This is
because other processes on the same machine may also be impacted
when the exception occurs. If the exception does still arise, we can
explore other potential solutions through detailed design and
discussions, such as the one you proposed, optimizing the reserved
space with a global counter of TM, etc. Although the current implementation
only partially addresses the exception issue, we expect to improve it
in subsequent versions due to the complexity of this FLIP. We would
appreciate hearing your thoughts on this matter.

Best,
Yuxin


Yun Tang  于2023年3月14日周二 14:48写道:

> Hi Yuxin
>
> Thanks for your reply.
> I am not saying that we should use an absolute reserved value to replace
> the current plan of the reserved fraction. We should trigger to kick the
> shuffle data to remote storage once either condition is reached.
> Maybe we could get some idea from the configuration of tiered based state
> backend [1].
>
> The concern about concurrent writing is due to the local disk being shared
> by all the instances running on that node, we cannot ensure other
> components would not flush data during shuffle writing. Perhaps we could
> switch to kick shuffle data to remote storage once no space left exception
> is met.
>
>
> [1]
> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo
>
>
> Best
> Yun Tang
>
> 
> From: Yuxin Tan 
> Sent: Monday, March 13, 2023 15:06
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> Hi,
> Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang.
>
> @Zhu Zhu
> > 1. I'm a bit concerned about the cost of it.
>
> The Read Client's request to check the existence of a segment in each
> storage tier is the netty message, which is similar to the credit updating
> messages. It is observed that the netty message cost for these operations
> is relatively low and the number of messages is also relatively low
> compared to credit updates which are sent every few buffers. Moreover,
> since this request involves only memory operations, the message cost
> is significantly low during the total reading data process.
>
> And we will further optimize the message cost later, particularly for
> segments that remain in the same tier without switching tiers. That
> is, for consecutive segments in the same tier, we can continue to send
> the next segment without waiting for the downstream to ask whether the
> segment exists in this tier.
>
>
> @Xia Sun
> > 1. how to keep the order of segments between different storage tiers
>
> To indicate the sequential order of upstream segments, we rely on segmentId
> downstream. Once segment n has been fully consumed by the downstream,
> the subsequent segment n + 1 will be asked in its natural order. As for the
> order of the buffers within each segment, they follow the default ordering
> mechanisms of Flink.
>
> > 2. what will happen if the disk space is found to be full?
>
> By introducing a way of reserving some space in advance, we will try to
> avoid this situation of reporting the disk space error as much as possible.
> The size of reserved space can be configured through the option introduced
> in the public API.
>
> > 3. For remote storage, does the switching between different tiers
> will bring overhead or waiting?
>
> Our primary focus for the first version is to address the problem of job
> stability
> and implement a new architecture because the change of t

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-14 Thread Yun Tang
Hi Yuxin

Thanks for your reply.
I am not saying that we should use an absolute reserved value to replace the 
current plan of the reserved fraction. We should trigger to kick the shuffle 
data to remote storage once either condition is reached.
Maybe we could get some idea from the configuration of tiered based state 
backend [1].

The concern about concurrent writing is due to the local disk being shared by 
all the instances running on that node, we cannot ensure other components would 
not flush data during shuffle writing. Perhaps we could switch to kick shuffle 
data to remote storage once no space left exception is met.


[1] 
https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo


Best
Yun Tang


From: Yuxin Tan 
Sent: Monday, March 13, 2023 15:06
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

Hi,
Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang.

@Zhu Zhu
> 1. I'm a bit concerned about the cost of it.

The Read Client's request to check the existence of a segment in each
storage tier is the netty message, which is similar to the credit updating
messages. It is observed that the netty message cost for these operations
is relatively low and the number of messages is also relatively low
compared to credit updates which are sent every few buffers. Moreover,
since this request involves only memory operations, the message cost
is significantly low during the total reading data process.

And we will further optimize the message cost later, particularly for
segments that remain in the same tier without switching tiers. That
is, for consecutive segments in the same tier, we can continue to send
the next segment without waiting for the downstream to ask whether the
segment exists in this tier.


@Xia Sun
> 1. how to keep the order of segments between different storage tiers

To indicate the sequential order of upstream segments, we rely on segmentId
downstream. Once segment n has been fully consumed by the downstream,
the subsequent segment n + 1 will be asked in its natural order. As for the
order of the buffers within each segment, they follow the default ordering
mechanisms of Flink.

> 2. what will happen if the disk space is found to be full?

By introducing a way of reserving some space in advance, we will try to
avoid this situation of reporting the disk space error as much as possible.
The size of reserved space can be configured through the option introduced
in the public API.

> 3. For remote storage, does the switching between different tiers
will bring overhead or waiting?

Our primary focus for the first version is to address the problem of job
stability
and implement a new architecture because the change of this FLIP has been
relatively complicated. However, remote storage may lead to additional
overhead for reading small files. Therefore, we recommend using remote
storage as a supplement to local disks, where the latter would serve as the
primary storage in most cases, resulting in less affection for performance.
After introducing the new architecture, we will also continue optimizing
the
remote storage performance. At that time, this change will be relatively
cohesive, focusing only on the remote storage tier, without affecting other
modules.


@Yun Tang

> 1. Normal shuffle which writes data to disk could also benefit from this.

I agree with the idea. And our core implementation design is independent
of shuffle mode, and is not strongly tied to hybrid shuffle. Actually, we
are
only applying this architecture to hybrid shuffle. If needed, it can be
adapted
by slight changes for other shuffle modes in the future, making it easier
for
them to benefit from this feature. However, for the first version, we are
focusing on hybrid shuffle because of the affection scope of the change
and the potential of hybrid shuffle.

> 2. will it affect current design of pluggable remote shuffle service?

It will not affect the current design of the pluggable shuffle service. Our
design focuses on improving the Flink internal shuffle.

> 3. the condition of min-reserve-space-fraction to kick local data to
remote
storage might not be a good idea in all cases

In fact, we believe that using fractions as opposed to fixed values has
more
general use cases, especially considering any default values cannot suffice
for all scenarios. For instance, default values such as 5GB are too small
for
a 3TB SATA disk, and are too large for a 20GB local disk (where 5GB
accounts
for 25% of the available disk space). Therefore, after conducting some
internal
discussions, we decided to opt for using fractions as the reserved space
value.

Is there any way we can reference it in the state-backend implementation?
the
fixed value or other solutions.

> 4. will we meet a concurrency problem when different subtasks within one
pro

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-13 Thread Yuxin Tan
tension on hybrid shuffle which
> introduces a lots of changes, will it affect current design of pluggable
> remote shuffle service, such as Apache Celeborn [1].
>
> Thirdly, based on my previous experiences on implementing a tiered based
> state-backend, the condition of min-reserve-space-fraction to kick local
> data to remote storage might not be a good idea in all cases, we still need
> to consider the absolute reserved disk storage. Take a 20GB local data disk
> as example, it might be a bit too late to kick the local data when only 1GB
> (20GB*5%) space left.
>
> Last but not least, will we meet a concurrency problem when different
> subtasks within one process/node start to check the left disk space before
> deciding to write to local or remote?
>
>
> [1] https://celeborn.apache.org/
>
> Best
> Yun Tang
>
> ________
> From: Xia Sun 
> Sent: Sunday, March 12, 2023 17:16
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> Hi Yuxin,
>
> Thanks for creating this FLIP!
> I'm a flink user, and in our internal scenario we use the colocation
> technology to run flink jobs and online service on the same machine
> together. We found that flink jobs are occasionally affected by other
> non-flink jobs (i.e. if the host disk space is full, that will result in
> 'No space left on device' error on flink jobs). This flip will really help
> us to benefit from hybrid shuffle without being worried about insufficient
> disk space problem.
>
> And I also have a few questions.
> 1. If the same subpartition spans multiple different tiers, how to keep the
> order of segments between different storage tiers (if necessary)?
> 2. In the process of writing to the local disk for a subpartition, what
> will happen if the disk space is found to be full? Will it report an error
> or automatically transfer to remote storage?
> 3. For remote storage, I noticed that it uses direct reading, which is
> different from the other two, does the switching between different tiers
> will bring overhead or waiting? In addition, compared to flink rss, which
> optimizes data compression and small file merging to improve throughput and
> relieve file system pressure, does the object storage system can meet the
> performance requirements and concurrent access challenges of large-scale
> batch jobs(parallelism > 1)?
>
> Thanks,
> Xia
>
> Zhu Zhu  于2023年3月10日周五 16:44写道:
>
> > Hi Yuxin,
> >
> > Thanks for creating this FLIP!
> > The idea of tiered storage looks good. Instead of choosing one from
> > multiple storages, it can help to balance between performance, cost and
> > stability. It also has the potential to adaptively select proper tiers
> > according to more runtime information, to achieve better performance
> > and ease of use.
> >
> > I have a question about the tier finding of data reading. In the FLIP
> > it proposes that the Read Client asks each storage tier whether a
> > given segment exists in it, from higher priority tiers to lower priority
> > ones. I'm a bit concerned about the cost of it, especially when data
> > are written to low priority tiers. Do you have any evaluation of it?
> > Is it possible to let the Reader Client know the location of the next
> > segment when it has finished reading one segment? Or maybe just let it
> > know whether the next segment is located in the same tier, if we can
> > have the assumption that tier changing would not be very frequent.
> >
> > Thanks,
> > Zhu
> >
> > Weihua Hu  于2023年3月10日周五 11:52写道:
> > >
> > > Thanks Yuxin for your explanation.
> > >
> > > That sounds reasonable. Looking forward to the new shuffle.
> > >
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan 
> > wrote:
> > >
> > > > Hi, Weihua,
> > > > Thanks for the questions and the ideas.
> > > >
> > > > > 1. How many performance regressions would there be if we only
> > > > used remote storage?
> > > >
> > > > The new architecture can support to use remote storage only, but this
> > > > FLIP target is to improve job stability. And the change in the FLIP
> has
> > > > been significantly complex and the goal of the first version is to
> > update
> > > > Hybrid Shuffle to the new architecture and support remote storage as
> > > > a supplement. The performance of this version is not the first
> > priority,
> > > > so we haven’t tested the 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-12 Thread Yun Tang
Hi Yuxin,

Thanks for this proposal.

From my understanding, this FLIP looks like a tiered based shuffle extension, 
which seems no need to bind with hybrid shuffle. Normal shuffle which writes 
data to disk could also benefit from this.

Secondly, since this targets to be an extension on hybrid shuffle which 
introduces a lots of changes, will it affect current design of pluggable remote 
shuffle service, such as Apache Celeborn [1].

Thirdly, based on my previous experiences on implementing a tiered based 
state-backend, the condition of min-reserve-space-fraction to kick local data 
to remote storage might not be a good idea in all cases, we still need to 
consider the absolute reserved disk storage. Take a 20GB local data disk as 
example, it might be a bit too late to kick the local data when only 1GB 
(20GB*5%) space left.

Last but not least, will we meet a concurrency problem when different subtasks 
within one process/node start to check the left disk space before deciding to 
write to local or remote?


[1] https://celeborn.apache.org/

Best
Yun Tang


From: Xia Sun 
Sent: Sunday, March 12, 2023 17:16
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

Hi Yuxin,

Thanks for creating this FLIP!
I'm a flink user, and in our internal scenario we use the colocation
technology to run flink jobs and online service on the same machine
together. We found that flink jobs are occasionally affected by other
non-flink jobs (i.e. if the host disk space is full, that will result in
'No space left on device' error on flink jobs). This flip will really help
us to benefit from hybrid shuffle without being worried about insufficient
disk space problem.

And I also have a few questions.
1. If the same subpartition spans multiple different tiers, how to keep the
order of segments between different storage tiers (if necessary)?
2. In the process of writing to the local disk for a subpartition, what
will happen if the disk space is found to be full? Will it report an error
or automatically transfer to remote storage?
3. For remote storage, I noticed that it uses direct reading, which is
different from the other two, does the switching between different tiers
will bring overhead or waiting? In addition, compared to flink rss, which
optimizes data compression and small file merging to improve throughput and
relieve file system pressure, does the object storage system can meet the
performance requirements and concurrent access challenges of large-scale
batch jobs(parallelism > 1)?

Thanks,
Xia

Zhu Zhu  于2023年3月10日周五 16:44写道:

> Hi Yuxin,
>
> Thanks for creating this FLIP!
> The idea of tiered storage looks good. Instead of choosing one from
> multiple storages, it can help to balance between performance, cost and
> stability. It also has the potential to adaptively select proper tiers
> according to more runtime information, to achieve better performance
> and ease of use.
>
> I have a question about the tier finding of data reading. In the FLIP
> it proposes that the Read Client asks each storage tier whether a
> given segment exists in it, from higher priority tiers to lower priority
> ones. I'm a bit concerned about the cost of it, especially when data
> are written to low priority tiers. Do you have any evaluation of it?
> Is it possible to let the Reader Client know the location of the next
> segment when it has finished reading one segment? Or maybe just let it
> know whether the next segment is located in the same tier, if we can
> have the assumption that tier changing would not be very frequent.
>
> Thanks,
> Zhu
>
> Weihua Hu  于2023年3月10日周五 11:52写道:
> >
> > Thanks Yuxin for your explanation.
> >
> > That sounds reasonable. Looking forward to the new shuffle.
> >
> >
> > Best,
> > Weihua
> >
> >
> > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan 
> wrote:
> >
> > > Hi, Weihua,
> > > Thanks for the questions and the ideas.
> > >
> > > > 1. How many performance regressions would there be if we only
> > > used remote storage?
> > >
> > > The new architecture can support to use remote storage only, but this
> > > FLIP target is to improve job stability. And the change in the FLIP has
> > > been significantly complex and the goal of the first version is to
> update
> > > Hybrid Shuffle to the new architecture and support remote storage as
> > > a supplement. The performance of this version is not the first
> priority,
> > > so we haven’t tested the performance of using only remote storage.
> > > If there are indeed regressions, we will keep optimizing the
> performance
> > > of the remote storages and improve it until only remote storage is
> > > av

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-12 Thread Xia Sun
Hi Yuxin,

Thanks for creating this FLIP!
I'm a flink user, and in our internal scenario we use the colocation
technology to run flink jobs and online service on the same machine
together. We found that flink jobs are occasionally affected by other
non-flink jobs (i.e. if the host disk space is full, that will result in
'No space left on device' error on flink jobs). This flip will really help
us to benefit from hybrid shuffle without being worried about insufficient
disk space problem.

And I also have a few questions.
1. If the same subpartition spans multiple different tiers, how to keep the
order of segments between different storage tiers (if necessary)?
2. In the process of writing to the local disk for a subpartition, what
will happen if the disk space is found to be full? Will it report an error
or automatically transfer to remote storage?
3. For remote storage, I noticed that it uses direct reading, which is
different from the other two, does the switching between different tiers
will bring overhead or waiting? In addition, compared to flink rss, which
optimizes data compression and small file merging to improve throughput and
relieve file system pressure, does the object storage system can meet the
performance requirements and concurrent access challenges of large-scale
batch jobs(parallelism > 1)?

Thanks,
Xia

Zhu Zhu  于2023年3月10日周五 16:44写道:

> Hi Yuxin,
>
> Thanks for creating this FLIP!
> The idea of tiered storage looks good. Instead of choosing one from
> multiple storages, it can help to balance between performance, cost and
> stability. It also has the potential to adaptively select proper tiers
> according to more runtime information, to achieve better performance
> and ease of use.
>
> I have a question about the tier finding of data reading. In the FLIP
> it proposes that the Read Client asks each storage tier whether a
> given segment exists in it, from higher priority tiers to lower priority
> ones. I'm a bit concerned about the cost of it, especially when data
> are written to low priority tiers. Do you have any evaluation of it?
> Is it possible to let the Reader Client know the location of the next
> segment when it has finished reading one segment? Or maybe just let it
> know whether the next segment is located in the same tier, if we can
> have the assumption that tier changing would not be very frequent.
>
> Thanks,
> Zhu
>
> Weihua Hu  于2023年3月10日周五 11:52写道:
> >
> > Thanks Yuxin for your explanation.
> >
> > That sounds reasonable. Looking forward to the new shuffle.
> >
> >
> > Best,
> > Weihua
> >
> >
> > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan 
> wrote:
> >
> > > Hi, Weihua,
> > > Thanks for the questions and the ideas.
> > >
> > > > 1. How many performance regressions would there be if we only
> > > used remote storage?
> > >
> > > The new architecture can support to use remote storage only, but this
> > > FLIP target is to improve job stability. And the change in the FLIP has
> > > been significantly complex and the goal of the first version is to
> update
> > > Hybrid Shuffle to the new architecture and support remote storage as
> > > a supplement. The performance of this version is not the first
> priority,
> > > so we haven’t tested the performance of using only remote storage.
> > > If there are indeed regressions, we will keep optimizing the
> performance
> > > of the remote storages and improve it until only remote storage is
> > > available in the production environment.
> > >
> > > > 2. Shall we move the local data to remote storage if the producer is
> > > finished for a long time?
> > >
> > > I agree that it is a good idea, which can release task manager
> resources
> > > more timely. But moving data from TM local disk to remote storage needs
> > > more detailed discussion and design, and it is easier to implement it
> based
> > > on the new architecture. Considering the complexity, the target focus,
> and
> > > the iteration cycle of the FLIP, we decide that the details are not
> > > included
> > > in the first version. We will extend and implement them in the
> subsequent
> > > versions.
> > >
> > > Best,
> > > Yuxin
> > >
> > >
> > > Weihua Hu  于2023年3月9日周四 11:22写道:
> > >
> > > > Hi, Yuxin
> > > >
> > > > Thanks for driving this FLIP.
> > > >
> > > > The remote storage shuffle could improve the stability of Batch jobs.
> > > >
> > > > In our internal scenario, we use a hybrid cluster to run both
> > > > Streaming(high priority)
> > > > and Batch jobs(low priority). When there is not enough
> resources(such as
> > > > cpu usage
> > > > reaches a threshold), the batch containers will be evicted. So this
> will
> > > > cause some re-run
> > > > of batch tasks.
> > > >
> > > > It would be a great help if the remote storage could address this.
> So I
> > > > have a few questions.
> > > >
> > > > 1. How many performance regressions would there be if we only used
> remote
> > > > storage?
> > > >
> > > > 2. In current design, the shuffle data segment will 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-10 Thread Zhu Zhu
Hi Yuxin,

Thanks for creating this FLIP!
The idea of tiered storage looks good. Instead of choosing one from
multiple storages, it can help to balance between performance, cost and
stability. It also has the potential to adaptively select proper tiers
according to more runtime information, to achieve better performance
and ease of use.

I have a question about the tier finding of data reading. In the FLIP
it proposes that the Read Client asks each storage tier whether a
given segment exists in it, from higher priority tiers to lower priority
ones. I'm a bit concerned about the cost of it, especially when data
are written to low priority tiers. Do you have any evaluation of it?
Is it possible to let the Reader Client know the location of the next
segment when it has finished reading one segment? Or maybe just let it
know whether the next segment is located in the same tier, if we can
have the assumption that tier changing would not be very frequent.

Thanks,
Zhu

Weihua Hu  于2023年3月10日周五 11:52写道:
>
> Thanks Yuxin for your explanation.
>
> That sounds reasonable. Looking forward to the new shuffle.
>
>
> Best,
> Weihua
>
>
> On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan  wrote:
>
> > Hi, Weihua,
> > Thanks for the questions and the ideas.
> >
> > > 1. How many performance regressions would there be if we only
> > used remote storage?
> >
> > The new architecture can support to use remote storage only, but this
> > FLIP target is to improve job stability. And the change in the FLIP has
> > been significantly complex and the goal of the first version is to update
> > Hybrid Shuffle to the new architecture and support remote storage as
> > a supplement. The performance of this version is not the first priority,
> > so we haven’t tested the performance of using only remote storage.
> > If there are indeed regressions, we will keep optimizing the performance
> > of the remote storages and improve it until only remote storage is
> > available in the production environment.
> >
> > > 2. Shall we move the local data to remote storage if the producer is
> > finished for a long time?
> >
> > I agree that it is a good idea, which can release task manager resources
> > more timely. But moving data from TM local disk to remote storage needs
> > more detailed discussion and design, and it is easier to implement it based
> > on the new architecture. Considering the complexity, the target focus, and
> > the iteration cycle of the FLIP, we decide that the details are not
> > included
> > in the first version. We will extend and implement them in the subsequent
> > versions.
> >
> > Best,
> > Yuxin
> >
> >
> > Weihua Hu  于2023年3月9日周四 11:22写道:
> >
> > > Hi, Yuxin
> > >
> > > Thanks for driving this FLIP.
> > >
> > > The remote storage shuffle could improve the stability of Batch jobs.
> > >
> > > In our internal scenario, we use a hybrid cluster to run both
> > > Streaming(high priority)
> > > and Batch jobs(low priority). When there is not enough resources(such as
> > > cpu usage
> > > reaches a threshold), the batch containers will be evicted. So this will
> > > cause some re-run
> > > of batch tasks.
> > >
> > > It would be a great help if the remote storage could address this. So I
> > > have a few questions.
> > >
> > > 1. How many performance regressions would there be if we only used remote
> > > storage?
> > >
> > > 2. In current design, the shuffle data segment will write to one kind of
> > > storage tier.
> > > Shall we move the local data to remote storage if the producer is
> > finished
> > > for a long time?
> > > So we can release the idle task manager with no shuffle data on it. This
> > > may help to reduce
> > > the resource usage when producer parallelism is larger than consume.
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan 
> > wrote:
> > >
> > > > Hi, Junrui,
> > > > Thanks for the suggestions and ideas.
> > > >
> > > > > If they are fixed, I suggest that FLIP could provide clearer
> > > > explanations.
> > > > I have updated the FLIP and described the segment size more clearly.
> > > >
> > > > > can we provide configuration options for users to manually adjust the
> > > > sizes?
> > > > The segment size can be configured if necessary. But considering that
> > if
> > > we
> > > > exposed these parameters prematurely, it may be difficult to modify the
> > > > implementation later because the user has already used the configs. We
> > > > can make these internal configs or fixed values when implementing the
> > > first
> > > > version, I think we can use either of these two ways, because they are
> > > > internal and do not affect the public APIs.
> > > >
> > > > Best,
> > > > Yuxin
> > > >
> > > >
> > > > Junrui Lee  于2023年3月8日周三 00:24写道:
> > > >
> > > > > Hi Yuxin,
> > > > >
> > > > > This FLIP looks quite reasonable. Flink can solve the problem of
> > Batch
> > > > > shuffle by
> > > > > combining local and remote storage, and can use fixed local disks for
> > > > 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-09 Thread Weihua Hu
Thanks Yuxin for your explanation.

That sounds reasonable. Looking forward to the new shuffle.


Best,
Weihua


On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan  wrote:

> Hi, Weihua,
> Thanks for the questions and the ideas.
>
> > 1. How many performance regressions would there be if we only
> used remote storage?
>
> The new architecture can support to use remote storage only, but this
> FLIP target is to improve job stability. And the change in the FLIP has
> been significantly complex and the goal of the first version is to update
> Hybrid Shuffle to the new architecture and support remote storage as
> a supplement. The performance of this version is not the first priority,
> so we haven’t tested the performance of using only remote storage.
> If there are indeed regressions, we will keep optimizing the performance
> of the remote storages and improve it until only remote storage is
> available in the production environment.
>
> > 2. Shall we move the local data to remote storage if the producer is
> finished for a long time?
>
> I agree that it is a good idea, which can release task manager resources
> more timely. But moving data from TM local disk to remote storage needs
> more detailed discussion and design, and it is easier to implement it based
> on the new architecture. Considering the complexity, the target focus, and
> the iteration cycle of the FLIP, we decide that the details are not
> included
> in the first version. We will extend and implement them in the subsequent
> versions.
>
> Best,
> Yuxin
>
>
> Weihua Hu  于2023年3月9日周四 11:22写道:
>
> > Hi, Yuxin
> >
> > Thanks for driving this FLIP.
> >
> > The remote storage shuffle could improve the stability of Batch jobs.
> >
> > In our internal scenario, we use a hybrid cluster to run both
> > Streaming(high priority)
> > and Batch jobs(low priority). When there is not enough resources(such as
> > cpu usage
> > reaches a threshold), the batch containers will be evicted. So this will
> > cause some re-run
> > of batch tasks.
> >
> > It would be a great help if the remote storage could address this. So I
> > have a few questions.
> >
> > 1. How many performance regressions would there be if we only used remote
> > storage?
> >
> > 2. In current design, the shuffle data segment will write to one kind of
> > storage tier.
> > Shall we move the local data to remote storage if the producer is
> finished
> > for a long time?
> > So we can release the idle task manager with no shuffle data on it. This
> > may help to reduce
> > the resource usage when producer parallelism is larger than consume.
> >
> > Best,
> > Weihua
> >
> >
> > On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan 
> wrote:
> >
> > > Hi, Junrui,
> > > Thanks for the suggestions and ideas.
> > >
> > > > If they are fixed, I suggest that FLIP could provide clearer
> > > explanations.
> > > I have updated the FLIP and described the segment size more clearly.
> > >
> > > > can we provide configuration options for users to manually adjust the
> > > sizes?
> > > The segment size can be configured if necessary. But considering that
> if
> > we
> > > exposed these parameters prematurely, it may be difficult to modify the
> > > implementation later because the user has already used the configs. We
> > > can make these internal configs or fixed values when implementing the
> > first
> > > version, I think we can use either of these two ways, because they are
> > > internal and do not affect the public APIs.
> > >
> > > Best,
> > > Yuxin
> > >
> > >
> > > Junrui Lee  于2023年3月8日周三 00:24写道:
> > >
> > > > Hi Yuxin,
> > > >
> > > > This FLIP looks quite reasonable. Flink can solve the problem of
> Batch
> > > > shuffle by
> > > > combining local and remote storage, and can use fixed local disks for
> > > > better performance
> > > >  in most scenarios, while using remote storage as a supplement when
> > local
> > > > disks are not
> > > >  sufficient, avoiding wasteful costs and poor job stability.
> Moreover,
> > > the
> > > > solution also
> > > > considers the issue of dynamic switching, which can automatically
> > switch
> > > to
> > > > remote
> > > > storage when the local disk is full, saving costs, and automatically
> > > switch
> > > > back when
> > > > there is available space on the local disk.
> > > >
> > > > As Wencong Liu stated, an appropriate segment size is essential, as
> it
> > > can
> > > > significantly
> > > > affect shuffle performance. I also agree that the first version
> should
> > > > focus mainly on the
> > > > design and implementation. However, I have a small question about
> > FLIP. I
> > > > did not see
> > > > any information regarding the segment size of memory, local disk, and
> > > > remote storage
> > > > in this FLIP. Are these three values fixed at present? If they are
> > > fixed, I
> > > > suggest that FLIP
> > > > could provide clearer explanations. Moreover, although a dynamic
> > segment
> > > > size
> > > > mechanism is not necessary at the moment, can we provide
> 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-09 Thread Yuxin Tan
Hi, Weihua,
Thanks for the questions and the ideas.

> 1. How many performance regressions would there be if we only
used remote storage?

The new architecture can support to use remote storage only, but this
FLIP target is to improve job stability. And the change in the FLIP has
been significantly complex and the goal of the first version is to update
Hybrid Shuffle to the new architecture and support remote storage as
a supplement. The performance of this version is not the first priority,
so we haven’t tested the performance of using only remote storage.
If there are indeed regressions, we will keep optimizing the performance
of the remote storages and improve it until only remote storage is
available in the production environment.

> 2. Shall we move the local data to remote storage if the producer is
finished for a long time?

I agree that it is a good idea, which can release task manager resources
more timely. But moving data from TM local disk to remote storage needs
more detailed discussion and design, and it is easier to implement it based
on the new architecture. Considering the complexity, the target focus, and
the iteration cycle of the FLIP, we decide that the details are not
included
in the first version. We will extend and implement them in the subsequent
versions.

Best,
Yuxin


Weihua Hu  于2023年3月9日周四 11:22写道:

> Hi, Yuxin
>
> Thanks for driving this FLIP.
>
> The remote storage shuffle could improve the stability of Batch jobs.
>
> In our internal scenario, we use a hybrid cluster to run both
> Streaming(high priority)
> and Batch jobs(low priority). When there is not enough resources(such as
> cpu usage
> reaches a threshold), the batch containers will be evicted. So this will
> cause some re-run
> of batch tasks.
>
> It would be a great help if the remote storage could address this. So I
> have a few questions.
>
> 1. How many performance regressions would there be if we only used remote
> storage?
>
> 2. In current design, the shuffle data segment will write to one kind of
> storage tier.
> Shall we move the local data to remote storage if the producer is finished
> for a long time?
> So we can release the idle task manager with no shuffle data on it. This
> may help to reduce
> the resource usage when producer parallelism is larger than consume.
>
> Best,
> Weihua
>
>
> On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan  wrote:
>
> > Hi, Junrui,
> > Thanks for the suggestions and ideas.
> >
> > > If they are fixed, I suggest that FLIP could provide clearer
> > explanations.
> > I have updated the FLIP and described the segment size more clearly.
> >
> > > can we provide configuration options for users to manually adjust the
> > sizes?
> > The segment size can be configured if necessary. But considering that if
> we
> > exposed these parameters prematurely, it may be difficult to modify the
> > implementation later because the user has already used the configs. We
> > can make these internal configs or fixed values when implementing the
> first
> > version, I think we can use either of these two ways, because they are
> > internal and do not affect the public APIs.
> >
> > Best,
> > Yuxin
> >
> >
> > Junrui Lee  于2023年3月8日周三 00:24写道:
> >
> > > Hi Yuxin,
> > >
> > > This FLIP looks quite reasonable. Flink can solve the problem of Batch
> > > shuffle by
> > > combining local and remote storage, and can use fixed local disks for
> > > better performance
> > >  in most scenarios, while using remote storage as a supplement when
> local
> > > disks are not
> > >  sufficient, avoiding wasteful costs and poor job stability. Moreover,
> > the
> > > solution also
> > > considers the issue of dynamic switching, which can automatically
> switch
> > to
> > > remote
> > > storage when the local disk is full, saving costs, and automatically
> > switch
> > > back when
> > > there is available space on the local disk.
> > >
> > > As Wencong Liu stated, an appropriate segment size is essential, as it
> > can
> > > significantly
> > > affect shuffle performance. I also agree that the first version should
> > > focus mainly on the
> > > design and implementation. However, I have a small question about
> FLIP. I
> > > did not see
> > > any information regarding the segment size of memory, local disk, and
> > > remote storage
> > > in this FLIP. Are these three values fixed at present? If they are
> > fixed, I
> > > suggest that FLIP
> > > could provide clearer explanations. Moreover, although a dynamic
> segment
> > > size
> > > mechanism is not necessary at the moment, can we provide configuration
> > > options for users
> > >  to manually adjust these sizes? I think it might be useful.
> > >
> > > Best,
> > > Junrui.
> > >
> > > Yuxin Tan  于2023年3月7日周二 20:14写道:
> > >
> > > > Thanks for joining the discussion.
> > > >
> > > > @weijie guo
> > > > > 1. How to optimize the broadcast result partition?
> > > > For the partitions with multi-consumers, e.g., broadcast result
> > > partition,
> > > > partition reuse,
> > 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-08 Thread Weihua Hu
Hi, Yuxin

Thanks for driving this FLIP.

The remote storage shuffle could improve the stability of Batch jobs.

In our internal scenario, we use a hybrid cluster to run both
Streaming(high priority)
and Batch jobs(low priority). When there is not enough resources(such as
cpu usage
reaches a threshold), the batch containers will be evicted. So this will
cause some re-run
of batch tasks.

It would be a great help if the remote storage could address this. So I
have a few questions.

1. How many performance regressions would there be if we only used remote
storage?

2. In current design, the shuffle data segment will write to one kind of
storage tier.
Shall we move the local data to remote storage if the producer is finished
for a long time?
So we can release the idle task manager with no shuffle data on it. This
may help to reduce
the resource usage when producer parallelism is larger than consume.

Best,
Weihua


On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan  wrote:

> Hi, Junrui,
> Thanks for the suggestions and ideas.
>
> > If they are fixed, I suggest that FLIP could provide clearer
> explanations.
> I have updated the FLIP and described the segment size more clearly.
>
> > can we provide configuration options for users to manually adjust the
> sizes?
> The segment size can be configured if necessary. But considering that if we
> exposed these parameters prematurely, it may be difficult to modify the
> implementation later because the user has already used the configs. We
> can make these internal configs or fixed values when implementing the first
> version, I think we can use either of these two ways, because they are
> internal and do not affect the public APIs.
>
> Best,
> Yuxin
>
>
> Junrui Lee  于2023年3月8日周三 00:24写道:
>
> > Hi Yuxin,
> >
> > This FLIP looks quite reasonable. Flink can solve the problem of Batch
> > shuffle by
> > combining local and remote storage, and can use fixed local disks for
> > better performance
> >  in most scenarios, while using remote storage as a supplement when local
> > disks are not
> >  sufficient, avoiding wasteful costs and poor job stability. Moreover,
> the
> > solution also
> > considers the issue of dynamic switching, which can automatically switch
> to
> > remote
> > storage when the local disk is full, saving costs, and automatically
> switch
> > back when
> > there is available space on the local disk.
> >
> > As Wencong Liu stated, an appropriate segment size is essential, as it
> can
> > significantly
> > affect shuffle performance. I also agree that the first version should
> > focus mainly on the
> > design and implementation. However, I have a small question about FLIP. I
> > did not see
> > any information regarding the segment size of memory, local disk, and
> > remote storage
> > in this FLIP. Are these three values fixed at present? If they are
> fixed, I
> > suggest that FLIP
> > could provide clearer explanations. Moreover, although a dynamic segment
> > size
> > mechanism is not necessary at the moment, can we provide configuration
> > options for users
> >  to manually adjust these sizes? I think it might be useful.
> >
> > Best,
> > Junrui.
> >
> > Yuxin Tan  于2023年3月7日周二 20:14写道:
> >
> > > Thanks for joining the discussion.
> > >
> > > @weijie guo
> > > > 1. How to optimize the broadcast result partition?
> > > For the partitions with multi-consumers, e.g., broadcast result
> > partition,
> > > partition reuse,
> > > speculative, etc, the processing logic is the same as the original
> Hybrid
> > > Shuffle, that is,
> > > using the full spilling strategy. It indeed may reduce the opportunity
> to
> > > consume from
> > > memory, but the PoC shows that it has no effect on the performance
> > > basically.
> > >
> > > > 2. Can the new proposal completely avoid this problem of inaccurate
> > > backlog
> > > calculation?
> > > Yes, this can avoid the problem completely. About the read buffers,
> the N
> > > is to reserve
> > > one exclusive buffer per channel, which is to avoid the deadlock
> because
> > > the buffers
> > > are acquired by some channels and other channels can not request any
> > > buffers. But
> > > the buffers except for the N can be floating (competing to request the
> > > buffers) by all
> > > channels.
> > >
> > > @Wencong Liu
> > > > Deciding the Segment size dynamically will be helpful.
> > > I agree that it may be better if the segment size is dynamically
> decided,
> > > but for simplifying
> > > the implementation of the first version, we want to make this a fixed
> > value
> > > for each tier.
> > > In the future, this can be a good improvement if necessary. In the
> first
> > > version, we will mainly
> > > focus on the more important features, such as the tiered storage
> > > architecture, dynamic
> > > switching tiers, supporting remote storage, memory management, etc.
> > >
> > > Best,
> > > Yuxin
> > >
> > >
> > > Wencong Liu  于2023年3月7日周二 16:48写道:
> > >
> > > > Hello Yuxin,
> > > >
> > > >
> > > > Thanks for your 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-08 Thread Yuxin Tan
Hi, Junrui,
Thanks for the suggestions and ideas.

> If they are fixed, I suggest that FLIP could provide clearer explanations.
I have updated the FLIP and described the segment size more clearly.

> can we provide configuration options for users to manually adjust the
sizes?
The segment size can be configured if necessary. But considering that if we
exposed these parameters prematurely, it may be difficult to modify the
implementation later because the user has already used the configs. We
can make these internal configs or fixed values when implementing the first
version, I think we can use either of these two ways, because they are
internal and do not affect the public APIs.

Best,
Yuxin


Junrui Lee  于2023年3月8日周三 00:24写道:

> Hi Yuxin,
>
> This FLIP looks quite reasonable. Flink can solve the problem of Batch
> shuffle by
> combining local and remote storage, and can use fixed local disks for
> better performance
>  in most scenarios, while using remote storage as a supplement when local
> disks are not
>  sufficient, avoiding wasteful costs and poor job stability. Moreover, the
> solution also
> considers the issue of dynamic switching, which can automatically switch to
> remote
> storage when the local disk is full, saving costs, and automatically switch
> back when
> there is available space on the local disk.
>
> As Wencong Liu stated, an appropriate segment size is essential, as it can
> significantly
> affect shuffle performance. I also agree that the first version should
> focus mainly on the
> design and implementation. However, I have a small question about FLIP. I
> did not see
> any information regarding the segment size of memory, local disk, and
> remote storage
> in this FLIP. Are these three values fixed at present? If they are fixed, I
> suggest that FLIP
> could provide clearer explanations. Moreover, although a dynamic segment
> size
> mechanism is not necessary at the moment, can we provide configuration
> options for users
>  to manually adjust these sizes? I think it might be useful.
>
> Best,
> Junrui.
>
> Yuxin Tan  于2023年3月7日周二 20:14写道:
>
> > Thanks for joining the discussion.
> >
> > @weijie guo
> > > 1. How to optimize the broadcast result partition?
> > For the partitions with multi-consumers, e.g., broadcast result
> partition,
> > partition reuse,
> > speculative, etc, the processing logic is the same as the original Hybrid
> > Shuffle, that is,
> > using the full spilling strategy. It indeed may reduce the opportunity to
> > consume from
> > memory, but the PoC shows that it has no effect on the performance
> > basically.
> >
> > > 2. Can the new proposal completely avoid this problem of inaccurate
> > backlog
> > calculation?
> > Yes, this can avoid the problem completely. About the read buffers, the N
> > is to reserve
> > one exclusive buffer per channel, which is to avoid the deadlock because
> > the buffers
> > are acquired by some channels and other channels can not request any
> > buffers. But
> > the buffers except for the N can be floating (competing to request the
> > buffers) by all
> > channels.
> >
> > @Wencong Liu
> > > Deciding the Segment size dynamically will be helpful.
> > I agree that it may be better if the segment size is dynamically decided,
> > but for simplifying
> > the implementation of the first version, we want to make this a fixed
> value
> > for each tier.
> > In the future, this can be a good improvement if necessary. In the first
> > version, we will mainly
> > focus on the more important features, such as the tiered storage
> > architecture, dynamic
> > switching tiers, supporting remote storage, memory management, etc.
> >
> > Best,
> > Yuxin
> >
> >
> > Wencong Liu  于2023年3月7日周二 16:48写道:
> >
> > > Hello Yuxin,
> > >
> > >
> > > Thanks for your proposal! Adding remote storage capability to
> Flink's
> > > Hybrid Shuffle is a significant improvement that addresses the issue of
> > > local disk storage limitations. This enhancement not only ensures
> > > uninterrupted Shuffle, but also enables Flink to handle larger
> workloads
> > > and more complex data processing tasks. With the ability to seamlessly
> > > shift between local and remote storage, Flink's Hybrid Shuffle will be
> > more
> > > versatile and scalable, making it an ideal choice for organizations
> > looking
> > > to build distributed data processing applications with ease.
> > > Besides, I've a small question about the size of Segment in
> different
> > > storages. According to the FLIP, the size of Segment may be fixed for
> > each
> > > Storage Tier, but I think the fixed size may affect the shuffle
> > > performance. For example, smaller segment size will improve the
> > utilization
> > > rate of Memory Storage Tier, but it may brings extra cost to Disk
> Storage
> > > Tier or Remote Storage Tier. Deciding the size of Segment dynamicly
> will
> > be
> > > helpful.
> > >
> > > Best,
> > >
> > >
> > > Wencong Liu
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-07 Thread Junrui Lee
Hi Yuxin,

This FLIP looks quite reasonable. Flink can solve the problem of Batch
shuffle by
combining local and remote storage, and can use fixed local disks for
better performance
 in most scenarios, while using remote storage as a supplement when local
disks are not
 sufficient, avoiding wasteful costs and poor job stability. Moreover, the
solution also
considers the issue of dynamic switching, which can automatically switch to
remote
storage when the local disk is full, saving costs, and automatically switch
back when
there is available space on the local disk.

As Wencong Liu stated, an appropriate segment size is essential, as it can
significantly
affect shuffle performance. I also agree that the first version should
focus mainly on the
design and implementation. However, I have a small question about FLIP. I
did not see
any information regarding the segment size of memory, local disk, and
remote storage
in this FLIP. Are these three values fixed at present? If they are fixed, I
suggest that FLIP
could provide clearer explanations. Moreover, although a dynamic segment
size
mechanism is not necessary at the moment, can we provide configuration
options for users
 to manually adjust these sizes? I think it might be useful.

Best,
Junrui.

Yuxin Tan  于2023年3月7日周二 20:14写道:

> Thanks for joining the discussion.
>
> @weijie guo
> > 1. How to optimize the broadcast result partition?
> For the partitions with multi-consumers, e.g., broadcast result partition,
> partition reuse,
> speculative, etc, the processing logic is the same as the original Hybrid
> Shuffle, that is,
> using the full spilling strategy. It indeed may reduce the opportunity to
> consume from
> memory, but the PoC shows that it has no effect on the performance
> basically.
>
> > 2. Can the new proposal completely avoid this problem of inaccurate
> backlog
> calculation?
> Yes, this can avoid the problem completely. About the read buffers, the N
> is to reserve
> one exclusive buffer per channel, which is to avoid the deadlock because
> the buffers
> are acquired by some channels and other channels can not request any
> buffers. But
> the buffers except for the N can be floating (competing to request the
> buffers) by all
> channels.
>
> @Wencong Liu
> > Deciding the Segment size dynamically will be helpful.
> I agree that it may be better if the segment size is dynamically decided,
> but for simplifying
> the implementation of the first version, we want to make this a fixed value
> for each tier.
> In the future, this can be a good improvement if necessary. In the first
> version, we will mainly
> focus on the more important features, such as the tiered storage
> architecture, dynamic
> switching tiers, supporting remote storage, memory management, etc.
>
> Best,
> Yuxin
>
>
> Wencong Liu  于2023年3月7日周二 16:48写道:
>
> > Hello Yuxin,
> >
> >
> > Thanks for your proposal! Adding remote storage capability to Flink's
> > Hybrid Shuffle is a significant improvement that addresses the issue of
> > local disk storage limitations. This enhancement not only ensures
> > uninterrupted Shuffle, but also enables Flink to handle larger workloads
> > and more complex data processing tasks. With the ability to seamlessly
> > shift between local and remote storage, Flink's Hybrid Shuffle will be
> more
> > versatile and scalable, making it an ideal choice for organizations
> looking
> > to build distributed data processing applications with ease.
> > Besides, I've a small question about the size of Segment in different
> > storages. According to the FLIP, the size of Segment may be fixed for
> each
> > Storage Tier, but I think the fixed size may affect the shuffle
> > performance. For example, smaller segment size will improve the
> utilization
> > rate of Memory Storage Tier, but it may brings extra cost to Disk Storage
> > Tier or Remote Storage Tier. Deciding the size of Segment dynamicly will
> be
> > helpful.
> >
> > Best,
> >
> >
> > Wencong Liu
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > At 2023-03-06 13:51:21, "Yuxin Tan"  wrote:
> > >Hi everyone,
> > >
> > >I would like to start a discussion on FLIP-301: Hybrid Shuffle supports
> > >Remote Storage[1].
> > >
> > >In the cloud-native environment, it is difficult to determine the
> > >appropriate
> > >disk space for Batch shuffle, which will affect job stability.
> > >
> > >This FLIP is to support Remote Storage for Hybrid Shuffle to improve the
> > >Batch job stability in the cloud-native environment.
> > >
> > >The goals of this FLIP are as follows.
> > >1. By default, use the local memory and disk to ensure high shuffle
> > >performance if the local storage space is sufficient.
> > >2. When the local storage space is insufficient, use remote storage as
> > >a supplement to avoid large-scale Batch job failure.
> > >
> > >Looking forward to hearing from you.
> > >
> > >[1]
> > >
> >
> 

Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-07 Thread Yuxin Tan
Thanks for joining the discussion.

@weijie guo
> 1. How to optimize the broadcast result partition?
For the partitions with multi-consumers, e.g., broadcast result partition,
partition reuse,
speculative, etc, the processing logic is the same as the original Hybrid
Shuffle, that is,
using the full spilling strategy. It indeed may reduce the opportunity to
consume from
memory, but the PoC shows that it has no effect on the performance
basically.

> 2. Can the new proposal completely avoid this problem of inaccurate
backlog
calculation?
Yes, this can avoid the problem completely. About the read buffers, the N
is to reserve
one exclusive buffer per channel, which is to avoid the deadlock because
the buffers
are acquired by some channels and other channels can not request any
buffers. But
the buffers except for the N can be floating (competing to request the
buffers) by all
channels.

@Wencong Liu
> Deciding the Segment size dynamically will be helpful.
I agree that it may be better if the segment size is dynamically decided,
but for simplifying
the implementation of the first version, we want to make this a fixed value
for each tier.
In the future, this can be a good improvement if necessary. In the first
version, we will mainly
focus on the more important features, such as the tiered storage
architecture, dynamic
switching tiers, supporting remote storage, memory management, etc.

Best,
Yuxin


Wencong Liu  于2023年3月7日周二 16:48写道:

> Hello Yuxin,
>
>
> Thanks for your proposal! Adding remote storage capability to Flink's
> Hybrid Shuffle is a significant improvement that addresses the issue of
> local disk storage limitations. This enhancement not only ensures
> uninterrupted Shuffle, but also enables Flink to handle larger workloads
> and more complex data processing tasks. With the ability to seamlessly
> shift between local and remote storage, Flink's Hybrid Shuffle will be more
> versatile and scalable, making it an ideal choice for organizations looking
> to build distributed data processing applications with ease.
> Besides, I've a small question about the size of Segment in different
> storages. According to the FLIP, the size of Segment may be fixed for each
> Storage Tier, but I think the fixed size may affect the shuffle
> performance. For example, smaller segment size will improve the utilization
> rate of Memory Storage Tier, but it may brings extra cost to Disk Storage
> Tier or Remote Storage Tier. Deciding the size of Segment dynamicly will be
> helpful.
>
> Best,
>
>
> Wencong Liu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-03-06 13:51:21, "Yuxin Tan"  wrote:
> >Hi everyone,
> >
> >I would like to start a discussion on FLIP-301: Hybrid Shuffle supports
> >Remote Storage[1].
> >
> >In the cloud-native environment, it is difficult to determine the
> >appropriate
> >disk space for Batch shuffle, which will affect job stability.
> >
> >This FLIP is to support Remote Storage for Hybrid Shuffle to improve the
> >Batch job stability in the cloud-native environment.
> >
> >The goals of this FLIP are as follows.
> >1. By default, use the local memory and disk to ensure high shuffle
> >performance if the local storage space is sufficient.
> >2. When the local storage space is insufficient, use remote storage as
> >a supplement to avoid large-scale Batch job failure.
> >
> >Looking forward to hearing from you.
> >
> >[1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage
> >
> >Best,
> >Yuxin
>


Re:[DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-07 Thread Wencong Liu
Hello Yuxin,


Thanks for your proposal! Adding remote storage capability to Flink's 
Hybrid Shuffle is a significant improvement that addresses the issue of local 
disk storage limitations. This enhancement not only ensures uninterrupted 
Shuffle, but also enables Flink to handle larger workloads and more complex 
data processing tasks. With the ability to seamlessly shift between local and 
remote storage, Flink's Hybrid Shuffle will be more versatile and scalable, 
making it an ideal choice for organizations looking to build distributed data 
processing applications with ease.
Besides, I've a small question about the size of Segment in different 
storages. According to the FLIP, the size of Segment may be fixed for each 
Storage Tier, but I think the fixed size may affect the shuffle performance. 
For example, smaller segment size will improve the utilization rate of Memory 
Storage Tier, but it may brings extra cost to Disk Storage Tier or Remote 
Storage Tier. Deciding the size of Segment dynamicly will be helpful.
 
Best,


Wencong Liu



















At 2023-03-06 13:51:21, "Yuxin Tan"  wrote:
>Hi everyone,
>
>I would like to start a discussion on FLIP-301: Hybrid Shuffle supports
>Remote Storage[1].
>
>In the cloud-native environment, it is difficult to determine the
>appropriate
>disk space for Batch shuffle, which will affect job stability.
>
>This FLIP is to support Remote Storage for Hybrid Shuffle to improve the
>Batch job stability in the cloud-native environment.
>
>The goals of this FLIP are as follows.
>1. By default, use the local memory and disk to ensure high shuffle
>performance if the local storage space is sufficient.
>2. When the local storage space is insufficient, use remote storage as
>a supplement to avoid large-scale Batch job failure.
>
>Looking forward to hearing from you.
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage
>
>Best,
>Yuxin


Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-07 Thread weijie guo
Hi Yuxin,

As one of the main contributors of hybrid shuffle, I am very happy to see
this improvement.

IMO, hybrid shuffle should gradually replace the default batch shuffle
implementation, because it is theoretically a superset of blocking
shuffle(i.e. SortMergeShuffle / BoundedBlockingShuffle). Today, with the
popularity of K8S, more and more people suffer from insufficient disk space
problem when running batch jobs on cloud environment. Therefore, supporting
elastic remote storage is a crucial part of this process.

This FLIP is good for me on the whole, but I still have some problems:

1. How to optimize the broadcast result partition?
Is it the same as the original hybrid shuffle that uses the full spilling
strategy? IIUC, this FLIP proposes to remove the spilling operation from
the memory layer to the disk layer, does it also lose the ability to
consume data from memory in this case? This problem can be extended to any
multi-consumer situation, such as partition reuse and speculative execution.

2. With regard to the number of reader-side memory management, the original
hybrid shuffle did not calculate an accurate backlog for performance
reasons, so it is necessary to reserve two exclusive buffers for each
channel to avoid some performance deterioration. Can the new proposal
completely avoid this problem? At the same time, does N in the formula mean
that one exclusive buffer is reserved for each channel? If so, why can't
all of them floating?


Best regards,

Weijie


Yuxin Tan  于2023年3月6日周一 13:51写道:

> Hi everyone,
>
> I would like to start a discussion on FLIP-301: Hybrid Shuffle supports
> Remote Storage[1].
>
> In the cloud-native environment, it is difficult to determine the
> appropriate
> disk space for Batch shuffle, which will affect job stability.
>
> This FLIP is to support Remote Storage for Hybrid Shuffle to improve the
> Batch job stability in the cloud-native environment.
>
> The goals of this FLIP are as follows.
> 1. By default, use the local memory and disk to ensure high shuffle
> performance if the local storage space is sufficient.
> 2. When the local storage space is insufficient, use remote storage as
> a supplement to avoid large-scale Batch job failure.
>
> Looking forward to hearing from you.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage
>
> Best,
> Yuxin
>


[DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

2023-03-05 Thread Yuxin Tan
Hi everyone,

I would like to start a discussion on FLIP-301: Hybrid Shuffle supports
Remote Storage[1].

In the cloud-native environment, it is difficult to determine the
appropriate
disk space for Batch shuffle, which will affect job stability.

This FLIP is to support Remote Storage for Hybrid Shuffle to improve the
Batch job stability in the cloud-native environment.

The goals of this FLIP are as follows.
1. By default, use the local memory and disk to ensure high shuffle
performance if the local storage space is sufficient.
2. When the local storage space is insufficient, use remote storage as
a supplement to avoid large-scale Batch job failure.

Looking forward to hearing from you.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage

Best,
Yuxin