Hi, Yun,

Thanks for sharing the ideas.

> 1. We should trigger to kick the shuffle data to remote storage
once either condition is reached

I believe that configuring two options in this manner is a pragmatic
approach that can fulfill a wider range of usage scenarios. However,
if we present two options, it may become difficult to remove them
in the future once users have started relying on them. On the other
hand, if we introduce a single option, we can easily incorporate
additional options based on your recommendations if required.
Thus, we recommend adopting a one-option solution in the first
version to address the issue.

> 2. Perhaps we could switch to kick shuffle data to remote storage
once no space left exception is met.

Thanks for your valuable feedback. While the suggestion is a viable
solution to address the no space left exception issue, we are
concerned that implementing it could create interdependence between
the disk tier and remote storage tier, which would contradict our goal
of achieving independence between tiers in the new architecture.
Moreover, we believe that it is better to prevent encountering the
exception in the first place by reserving adequate disk space. This is
because other processes on the same machine may also be impacted
when the exception occurs. If the exception does still arise, we can
explore other potential solutions through detailed design and
discussions, such as the one you proposed, optimizing the reserved
space with a global counter of TM, etc. Although the current implementation
only partially addresses the exception issue, we expect to improve it
in subsequent versions due to the complexity of this FLIP. We would
appreciate hearing your thoughts on this matter.

Best,
Yuxin


Yun Tang <myas...@live.com> 于2023年3月14日周二 14:48写道:

> Hi Yuxin
>
> Thanks for your reply.
> I am not saying that we should use an absolute reserved value to replace
> the current plan of the reserved fraction. We should trigger to kick the
> shuffle data to remote storage once either condition is reached.
> Maybe we could get some idea from the configuration of tiered based state
> backend [1].
>
> The concern about concurrent writing is due to the local disk being shared
> by all the instances running on that node, we cannot ensure other
> components would not flush data during shuffle writing. Perhaps we could
> switch to kick shuffle data to remote storage once no space left exception
> is met.
>
>
> [1]
> https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo
>
>
> Best
> Yun Tang
>
> ________________________________
> From: Yuxin Tan <tanyuxinw...@gmail.com>
> Sent: Monday, March 13, 2023 15:06
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> Hi,
> Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang.
>
> @Zhu Zhu
> > 1. I'm a bit concerned about the cost of it.
>
> The Read Client's request to check the existence of a segment in each
> storage tier is the netty message, which is similar to the credit updating
> messages. It is observed that the netty message cost for these operations
> is relatively low and the number of messages is also relatively low
> compared to credit updates which are sent every few buffers. Moreover,
> since this request involves only memory operations, the message cost
> is significantly low during the total reading data process.
>
> And we will further optimize the message cost later, particularly for
> segments that remain in the same tier without switching tiers. That
> is, for consecutive segments in the same tier, we can continue to send
> the next segment without waiting for the downstream to ask whether the
> segment exists in this tier.
>
>
> @Xia Sun
> > 1. how to keep the order of segments between different storage tiers
>
> To indicate the sequential order of upstream segments, we rely on segmentId
> downstream. Once segment n has been fully consumed by the downstream,
> the subsequent segment n + 1 will be asked in its natural order. As for the
> order of the buffers within each segment, they follow the default ordering
> mechanisms of Flink.
>
> > 2. what will happen if the disk space is found to be full?
>
> By introducing a way of reserving some space in advance, we will try to
> avoid this situation of reporting the disk space error as much as possible.
> The size of reserved space can be configured through the option introduced
> in the public API.
>
> > 3. For remote storage, does the switching between different tiers
> will bring overhead or waiting?
>
> Our primary focus for the first version is to address the problem of job
> stability
> and implement a new architecture because the change of this FLIP has been
> relatively complicated. However, remote storage may lead to additional
> overhead for reading small files. Therefore, we recommend using remote
> storage as a supplement to local disks, where the latter would serve as the
> primary storage in most cases, resulting in less affection for performance.
> After introducing the new architecture, we will also continue optimizing
> the
> remote storage performance. At that time, this change will be relatively
> cohesive, focusing only on the remote storage tier, without affecting other
> modules.
>
>
> @Yun Tang
>
> > 1. Normal shuffle which writes data to disk could also benefit from this.
>
> I agree with the idea. And our core implementation design is independent
> of shuffle mode, and is not strongly tied to hybrid shuffle. Actually, we
> are
> only applying this architecture to hybrid shuffle. If needed, it can be
> adapted
> by slight changes for other shuffle modes in the future, making it easier
> for
> them to benefit from this feature. However, for the first version, we are
> focusing on hybrid shuffle because of the affection scope of the change
> and the potential of hybrid shuffle.
>
> > 2. will it affect current design of pluggable remote shuffle service?
>
> It will not affect the current design of the pluggable shuffle service. Our
> design focuses on improving the Flink internal shuffle.
>
> > 3. the condition of min-reserve-space-fraction to kick local data to
> remote
> storage might not be a good idea in all cases
>
> In fact, we believe that using fractions as opposed to fixed values has
> more
> general use cases, especially considering any default values cannot suffice
> for all scenarios. For instance, default values such as 5GB are too small
> for
> a 3TB SATA disk, and are too large for a 20GB local disk (where 5GB
> accounts
> for 25% of the available disk space). Therefore, after conducting some
> internal
> discussions, we decided to opt for using fractions as the reserved space
> value.
>
> Is there any way we can reference it in the state-backend implementation?
> the
> fixed value or other solutions.
>
> > 4. will we meet a concurrency problem when different subtasks within one
> process/node start to check the left disk space?
>
> Thanks for your input. If there is enough reserved space to store the
> concurrently written segments, this issue will not occur. However, if the
> problem
> persists, we suggest implementing a global accounting reservation for a
> task
> manager. Before writing a segment, we would need to check if the remaining
> space minus the accounting number is sufficient. If it is, we would add the
> current segment to the global accounting before writing the data. This is
> our
> initial idea on the issue.
> We welcome any additional ideas or better solutions you may have.
>
>
> Best,
> Yuxin
>
>
> Yun Tang <myas...@live.com> 于2023年3月13日周一 01:16写道:
>
> > Hi Yuxin,
> >
> > Thanks for this proposal.
> >
> > From my understanding, this FLIP looks like a tiered based shuffle
> > extension, which seems no need to bind with hybrid shuffle. Normal
> shuffle
> > which writes data to disk could also benefit from this.
> >
> > Secondly, since this targets to be an extension on hybrid shuffle which
> > introduces a lots of changes, will it affect current design of pluggable
> > remote shuffle service, such as Apache Celeborn [1].
> >
> > Thirdly, based on my previous experiences on implementing a tiered based
> > state-backend, the condition of min-reserve-space-fraction to kick local
> > data to remote storage might not be a good idea in all cases, we still
> need
> > to consider the absolute reserved disk storage. Take a 20GB local data
> disk
> > as example, it might be a bit too late to kick the local data when only
> 1GB
> > (20GB*5%) space left.
> >
> > Last but not least, will we meet a concurrency problem when different
> > subtasks within one process/node start to check the left disk space
> before
> > deciding to write to local or remote?
> >
> >
> > [1] https://celeborn.apache.org/
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Xia Sun <xingbe...@gmail.com>
> > Sent: Sunday, March 12, 2023 17:16
> > To: dev@flink.apache.org <dev@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
> >
> > Hi Yuxin,
> >
> > Thanks for creating this FLIP!
> > I'm a flink user, and in our internal scenario we use the colocation
> > technology to run flink jobs and online service on the same machine
> > together. We found that flink jobs are occasionally affected by other
> > non-flink jobs (i.e. if the host disk space is full, that will result in
> > 'No space left on device' error on flink jobs). This flip will really
> help
> > us to benefit from hybrid shuffle without being worried about
> insufficient
> > disk space problem.
> >
> > And I also have a few questions.
> > 1. If the same subpartition spans multiple different tiers, how to keep
> the
> > order of segments between different storage tiers (if necessary)?
> > 2. In the process of writing to the local disk for a subpartition, what
> > will happen if the disk space is found to be full? Will it report an
> error
> > or automatically transfer to remote storage?
> > 3. For remote storage, I noticed that it uses direct reading, which is
> > different from the other two, does the switching between different tiers
> > will bring overhead or waiting? In addition, compared to flink rss, which
> > optimizes data compression and small file merging to improve throughput
> and
> > relieve file system pressure, does the object storage system can meet the
> > performance requirements and concurrent access challenges of large-scale
> > batch jobs(parallelism > 10000)?
> >
> > Thanks,
> > Xia
> >
> > Zhu Zhu <reed...@gmail.com> 于2023年3月10日周五 16:44写道:
> >
> > > Hi Yuxin,
> > >
> > > Thanks for creating this FLIP!
> > > The idea of tiered storage looks good. Instead of choosing one from
> > > multiple storages, it can help to balance between performance, cost and
> > > stability. It also has the potential to adaptively select proper tiers
> > > according to more runtime information, to achieve better performance
> > > and ease of use.
> > >
> > > I have a question about the tier finding of data reading. In the FLIP
> > > it proposes that the Read Client asks each storage tier whether a
> > > given segment exists in it, from higher priority tiers to lower
> priority
> > > ones. I'm a bit concerned about the cost of it, especially when data
> > > are written to low priority tiers. Do you have any evaluation of it?
> > > Is it possible to let the Reader Client know the location of the next
> > > segment when it has finished reading one segment? Or maybe just let it
> > > know whether the next segment is located in the same tier, if we can
> > > have the assumption that tier changing would not be very frequent.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Weihua Hu <huweihua....@gmail.com> 于2023年3月10日周五 11:52写道:
> > > >
> > > > Thanks Yuxin for your explanation.
> > > >
> > > > That sounds reasonable. Looking forward to the new shuffle.
> > > >
> > > >
> > > > Best,
> > > > Weihua
> > > >
> > > >
> > > > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan <tanyuxinw...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi, Weihua,
> > > > > Thanks for the questions and the ideas.
> > > > >
> > > > > > 1. How many performance regressions would there be if we only
> > > > > used remote storage?
> > > > >
> > > > > The new architecture can support to use remote storage only, but
> this
> > > > > FLIP target is to improve job stability. And the change in the FLIP
> > has
> > > > > been significantly complex and the goal of the first version is to
> > > update
> > > > > Hybrid Shuffle to the new architecture and support remote storage
> as
> > > > > a supplement. The performance of this version is not the first
> > > priority,
> > > > > so we haven’t tested the performance of using only remote storage.
> > > > > If there are indeed regressions, we will keep optimizing the
> > > performance
> > > > > of the remote storages and improve it until only remote storage is
> > > > > available in the production environment.
> > > > >
> > > > > > 2. Shall we move the local data to remote storage if the producer
> > is
> > > > > finished for a long time?
> > > > >
> > > > > I agree that it is a good idea, which can release task manager
> > > resources
> > > > > more timely. But moving data from TM local disk to remote storage
> > needs
> > > > > more detailed discussion and design, and it is easier to implement
> it
> > > based
> > > > > on the new architecture. Considering the complexity, the target
> > focus,
> > > and
> > > > > the iteration cycle of the FLIP, we decide that the details are not
> > > > > included
> > > > > in the first version. We will extend and implement them in the
> > > subsequent
> > > > > versions.
> > > > >
> > > > > Best,
> > > > > Yuxin
> > > > >
> > > > >
> > > > > Weihua Hu <huweihua....@gmail.com> 于2023年3月9日周四 11:22写道:
> > > > >
> > > > > > Hi, Yuxin
> > > > > >
> > > > > > Thanks for driving this FLIP.
> > > > > >
> > > > > > The remote storage shuffle could improve the stability of Batch
> > jobs.
> > > > > >
> > > > > > In our internal scenario, we use a hybrid cluster to run both
> > > > > > Streaming(high priority)
> > > > > > and Batch jobs(low priority). When there is not enough
> > > resources(such as
> > > > > > cpu usage
> > > > > > reaches a threshold), the batch containers will be evicted. So
> this
> > > will
> > > > > > cause some re-run
> > > > > > of batch tasks.
> > > > > >
> > > > > > It would be a great help if the remote storage could address
> this.
> > > So I
> > > > > > have a few questions.
> > > > > >
> > > > > > 1. How many performance regressions would there be if we only
> used
> > > remote
> > > > > > storage?
> > > > > >
> > > > > > 2. In current design, the shuffle data segment will write to one
> > > kind of
> > > > > > storage tier.
> > > > > > Shall we move the local data to remote storage if the producer is
> > > > > finished
> > > > > > for a long time?
> > > > > > So we can release the idle task manager with no shuffle data on
> it.
> > > This
> > > > > > may help to reduce
> > > > > > the resource usage when producer parallelism is larger than
> > consume.
> > > > > >
> > > > > > Best,
> > > > > > Weihua
> > > > > >
> > > > > >
> > > > > > On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan <
> tanyuxinw...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi, Junrui,
> > > > > > > Thanks for the suggestions and ideas.
> > > > > > >
> > > > > > > > If they are fixed, I suggest that FLIP could provide clearer
> > > > > > > explanations.
> > > > > > > I have updated the FLIP and described the segment size more
> > > clearly.
> > > > > > >
> > > > > > > > can we provide configuration options for users to manually
> > > adjust the
> > > > > > > sizes?
> > > > > > > The segment size can be configured if necessary. But
> considering
> > > that
> > > > > if
> > > > > > we
> > > > > > > exposed these parameters prematurely, it may be difficult to
> > > modify the
> > > > > > > implementation later because the user has already used the
> > > configs. We
> > > > > > > can make these internal configs or fixed values when
> implementing
> > > the
> > > > > > first
> > > > > > > version, I think we can use either of these two ways, because
> > they
> > > are
> > > > > > > internal and do not affect the public APIs.
> > > > > > >
> > > > > > > Best,
> > > > > > > Yuxin
> > > > > > >
> > > > > > >
> > > > > > > Junrui Lee <jrlee....@gmail.com> 于2023年3月8日周三 00:24写道:
> > > > > > >
> > > > > > > > Hi Yuxin,
> > > > > > > >
> > > > > > > > This FLIP looks quite reasonable. Flink can solve the problem
> > of
> > > > > Batch
> > > > > > > > shuffle by
> > > > > > > > combining local and remote storage, and can use fixed local
> > > disks for
> > > > > > > > better performance
> > > > > > > >  in most scenarios, while using remote storage as a
> supplement
> > > when
> > > > > > local
> > > > > > > > disks are not
> > > > > > > >  sufficient, avoiding wasteful costs and poor job stability.
> > > > > Moreover,
> > > > > > > the
> > > > > > > > solution also
> > > > > > > > considers the issue of dynamic switching, which can
> > automatically
> > > > > > switch
> > > > > > > to
> > > > > > > > remote
> > > > > > > > storage when the local disk is full, saving costs, and
> > > automatically
> > > > > > > switch
> > > > > > > > back when
> > > > > > > > there is available space on the local disk.
> > > > > > > >
> > > > > > > > As Wencong Liu stated, an appropriate segment size is
> > essential,
> > > as
> > > > > it
> > > > > > > can
> > > > > > > > significantly
> > > > > > > > affect shuffle performance. I also agree that the first
> version
> > > > > should
> > > > > > > > focus mainly on the
> > > > > > > > design and implementation. However, I have a small question
> > about
> > > > > > FLIP. I
> > > > > > > > did not see
> > > > > > > > any information regarding the segment size of memory, local
> > > disk, and
> > > > > > > > remote storage
> > > > > > > > in this FLIP. Are these three values fixed at present? If
> they
> > > are
> > > > > > > fixed, I
> > > > > > > > suggest that FLIP
> > > > > > > > could provide clearer explanations. Moreover, although a
> > dynamic
> > > > > > segment
> > > > > > > > size
> > > > > > > > mechanism is not necessary at the moment, can we provide
> > > > > configuration
> > > > > > > > options for users
> > > > > > > >  to manually adjust these sizes? I think it might be useful.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Junrui.
> > > > > > > >
> > > > > > > > Yuxin Tan <tanyuxinw...@gmail.com> 于2023年3月7日周二 20:14写道:
> > > > > > > >
> > > > > > > > > Thanks for joining the discussion.
> > > > > > > > >
> > > > > > > > > @weijie guo
> > > > > > > > > > 1. How to optimize the broadcast result partition?
> > > > > > > > > For the partitions with multi-consumers, e.g., broadcast
> > result
> > > > > > > > partition,
> > > > > > > > > partition reuse,
> > > > > > > > > speculative, etc, the processing logic is the same as the
> > > original
> > > > > > > Hybrid
> > > > > > > > > Shuffle, that is,
> > > > > > > > > using the full spilling strategy. It indeed may reduce the
> > > > > > opportunity
> > > > > > > to
> > > > > > > > > consume from
> > > > > > > > > memory, but the PoC shows that it has no effect on the
> > > performance
> > > > > > > > > basically.
> > > > > > > > >
> > > > > > > > > > 2. Can the new proposal completely avoid this problem of
> > > > > inaccurate
> > > > > > > > > backlog
> > > > > > > > > calculation?
> > > > > > > > > Yes, this can avoid the problem completely. About the read
> > > buffers,
> > > > > > > the N
> > > > > > > > > is to reserve
> > > > > > > > > one exclusive buffer per channel, which is to avoid the
> > > deadlock
> > > > > > > because
> > > > > > > > > the buffers
> > > > > > > > > are acquired by some channels and other channels can not
> > > request
> > > > > any
> > > > > > > > > buffers. But
> > > > > > > > > the buffers except for the N can be floating (competing to
> > > request
> > > > > > the
> > > > > > > > > buffers) by all
> > > > > > > > > channels.
> > > > > > > > >
> > > > > > > > > @Wencong Liu
> > > > > > > > > > Deciding the Segment size dynamically will be helpful.
> > > > > > > > > I agree that it may be better if the segment size is
> > > dynamically
> > > > > > > decided,
> > > > > > > > > but for simplifying
> > > > > > > > > the implementation of the first version, we want to make
> > this a
> > > > > fixed
> > > > > > > > value
> > > > > > > > > for each tier.
> > > > > > > > > In the future, this can be a good improvement if necessary.
> > In
> > > the
> > > > > > > first
> > > > > > > > > version, we will mainly
> > > > > > > > > focus on the more important features, such as the tiered
> > > storage
> > > > > > > > > architecture, dynamic
> > > > > > > > > switching tiers, supporting remote storage, memory
> > management,
> > > etc.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Yuxin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Wencong Liu <liuwencle...@163.com> 于2023年3月7日周二 16:48写道:
> > > > > > > > >
> > > > > > > > > > Hello Yuxin,
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >     Thanks for your proposal! Adding remote storage
> > > capability to
> > > > > > > > Flink's
> > > > > > > > > > Hybrid Shuffle is a significant improvement that
> addresses
> > > the
> > > > > > issue
> > > > > > > of
> > > > > > > > > > local disk storage limitations. This enhancement not only
> > > ensures
> > > > > > > > > > uninterrupted Shuffle, but also enables Flink to handle
> > > larger
> > > > > > > > workloads
> > > > > > > > > > and more complex data processing tasks. With the ability
> to
> > > > > > > seamlessly
> > > > > > > > > > shift between local and remote storage, Flink's Hybrid
> > > Shuffle
> > > > > will
> > > > > > > be
> > > > > > > > > more
> > > > > > > > > > versatile and scalable, making it an ideal choice for
> > > > > organizations
> > > > > > > > > looking
> > > > > > > > > > to build distributed data processing applications with
> > ease.
> > > > > > > > > >     Besides, I've a small question about the size of
> > Segment
> > > in
> > > > > > > > different
> > > > > > > > > > storages. According to the FLIP, the size of Segment may
> be
> > > fixed
> > > > > > for
> > > > > > > > > each
> > > > > > > > > > Storage Tier, but I think the fixed size may affect the
> > > shuffle
> > > > > > > > > > performance. For example, smaller segment size will
> improve
> > > the
> > > > > > > > > utilization
> > > > > > > > > > rate of Memory Storage Tier, but it may brings extra cost
> > to
> > > Disk
> > > > > > > > Storage
> > > > > > > > > > Tier or Remote Storage Tier. Deciding the size of Segment
> > > > > dynamicly
> > > > > > > > will
> > > > > > > > > be
> > > > > > > > > > helpful.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Wencong Liu
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > At 2023-03-06 13:51:21, "Yuxin Tan" <
> > tanyuxinw...@gmail.com>
> > > > > > wrote:
> > > > > > > > > > >Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > >I would like to start a discussion on FLIP-301: Hybrid
> > > Shuffle
> > > > > > > > supports
> > > > > > > > > > >Remote Storage[1].
> > > > > > > > > > >
> > > > > > > > > > >In the cloud-native environment, it is difficult to
> > > determine
> > > > > the
> > > > > > > > > > >appropriate
> > > > > > > > > > >disk space for Batch shuffle, which will affect job
> > > stability.
> > > > > > > > > > >
> > > > > > > > > > >This FLIP is to support Remote Storage for Hybrid
> Shuffle
> > to
> > > > > > improve
> > > > > > > > the
> > > > > > > > > > >Batch job stability in the cloud-native environment.
> > > > > > > > > > >
> > > > > > > > > > >The goals of this FLIP are as follows.
> > > > > > > > > > >1. By default, use the local memory and disk to ensure
> > high
> > > > > > shuffle
> > > > > > > > > > >performance if the local storage space is sufficient.
> > > > > > > > > > >2. When the local storage space is insufficient, use
> > remote
> > > > > > storage
> > > > > > > as
> > > > > > > > > > >a supplement to avoid large-scale Batch job failure.
> > > > > > > > > > >
> > > > > > > > > > >Looking forward to hearing from you.
> > > > > > > > > > >
> > > > > > > > > > >[1]
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage
> > > > > > > > > > >
> > > > > > > > > > >Best,
> > > > > > > > > > >Yuxin
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> >
>

Reply via email to