Hi Yuxin

Thanks for your reply.
I am not saying that we should use an absolute reserved value to replace the 
current plan of the reserved fraction. We should trigger to kick the shuffle 
data to remote storage once either condition is reached.
Maybe we could get some idea from the configuration of tiered based state 
backend [1].

The concern about concurrent writing is due to the local disk being shared by 
all the instances running on that node, we cannot ensure other components would 
not flush data during shuffle writing. Perhaps we could switch to kick shuffle 
data to remote storage once no space left exception is met.


[1] 
https://www.alibabacloud.com/help/en/realtime-compute-for-apache-flink/latest/geministatebackend-configurations#section-u0y-on0-owo


Best
Yun Tang

________________________________
From: Yuxin Tan <tanyuxinw...@gmail.com>
Sent: Monday, March 13, 2023 15:06
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage

Hi,
Thanks for the feedback and questions from Zhu Zhu, Xia Sun and Yun Tang.

@Zhu Zhu
> 1. I'm a bit concerned about the cost of it.

The Read Client's request to check the existence of a segment in each
storage tier is the netty message, which is similar to the credit updating
messages. It is observed that the netty message cost for these operations
is relatively low and the number of messages is also relatively low
compared to credit updates which are sent every few buffers. Moreover,
since this request involves only memory operations, the message cost
is significantly low during the total reading data process.

And we will further optimize the message cost later, particularly for
segments that remain in the same tier without switching tiers. That
is, for consecutive segments in the same tier, we can continue to send
the next segment without waiting for the downstream to ask whether the
segment exists in this tier.


@Xia Sun
> 1. how to keep the order of segments between different storage tiers

To indicate the sequential order of upstream segments, we rely on segmentId
downstream. Once segment n has been fully consumed by the downstream,
the subsequent segment n + 1 will be asked in its natural order. As for the
order of the buffers within each segment, they follow the default ordering
mechanisms of Flink.

> 2. what will happen if the disk space is found to be full?

By introducing a way of reserving some space in advance, we will try to
avoid this situation of reporting the disk space error as much as possible.
The size of reserved space can be configured through the option introduced
in the public API.

> 3. For remote storage, does the switching between different tiers
will bring overhead or waiting?

Our primary focus for the first version is to address the problem of job
stability
and implement a new architecture because the change of this FLIP has been
relatively complicated. However, remote storage may lead to additional
overhead for reading small files. Therefore, we recommend using remote
storage as a supplement to local disks, where the latter would serve as the
primary storage in most cases, resulting in less affection for performance.
After introducing the new architecture, we will also continue optimizing
the
remote storage performance. At that time, this change will be relatively
cohesive, focusing only on the remote storage tier, without affecting other
modules.


@Yun Tang

> 1. Normal shuffle which writes data to disk could also benefit from this.

I agree with the idea. And our core implementation design is independent
of shuffle mode, and is not strongly tied to hybrid shuffle. Actually, we
are
only applying this architecture to hybrid shuffle. If needed, it can be
adapted
by slight changes for other shuffle modes in the future, making it easier
for
them to benefit from this feature. However, for the first version, we are
focusing on hybrid shuffle because of the affection scope of the change
and the potential of hybrid shuffle.

> 2. will it affect current design of pluggable remote shuffle service?

It will not affect the current design of the pluggable shuffle service. Our
design focuses on improving the Flink internal shuffle.

> 3. the condition of min-reserve-space-fraction to kick local data to
remote
storage might not be a good idea in all cases

In fact, we believe that using fractions as opposed to fixed values has
more
general use cases, especially considering any default values cannot suffice
for all scenarios. For instance, default values such as 5GB are too small
for
a 3TB SATA disk, and are too large for a 20GB local disk (where 5GB
accounts
for 25% of the available disk space). Therefore, after conducting some
internal
discussions, we decided to opt for using fractions as the reserved space
value.

Is there any way we can reference it in the state-backend implementation?
the
fixed value or other solutions.

> 4. will we meet a concurrency problem when different subtasks within one
process/node start to check the left disk space?

Thanks for your input. If there is enough reserved space to store the
concurrently written segments, this issue will not occur. However, if the
problem
persists, we suggest implementing a global accounting reservation for a
task
manager. Before writing a segment, we would need to check if the remaining
space minus the accounting number is sufficient. If it is, we would add the
current segment to the global accounting before writing the data. This is
our
initial idea on the issue.
We welcome any additional ideas or better solutions you may have.


Best,
Yuxin


Yun Tang <myas...@live.com> 于2023年3月13日周一 01:16写道:

> Hi Yuxin,
>
> Thanks for this proposal.
>
> From my understanding, this FLIP looks like a tiered based shuffle
> extension, which seems no need to bind with hybrid shuffle. Normal shuffle
> which writes data to disk could also benefit from this.
>
> Secondly, since this targets to be an extension on hybrid shuffle which
> introduces a lots of changes, will it affect current design of pluggable
> remote shuffle service, such as Apache Celeborn [1].
>
> Thirdly, based on my previous experiences on implementing a tiered based
> state-backend, the condition of min-reserve-space-fraction to kick local
> data to remote storage might not be a good idea in all cases, we still need
> to consider the absolute reserved disk storage. Take a 20GB local data disk
> as example, it might be a bit too late to kick the local data when only 1GB
> (20GB*5%) space left.
>
> Last but not least, will we meet a concurrency problem when different
> subtasks within one process/node start to check the left disk space before
> deciding to write to local or remote?
>
>
> [1] https://celeborn.apache.org/
>
> Best
> Yun Tang
>
> ________________________________
> From: Xia Sun <xingbe...@gmail.com>
> Sent: Sunday, March 12, 2023 17:16
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: Re: [DISCUSS] FLIP-301: Hybrid Shuffle supports Remote Storage
>
> Hi Yuxin,
>
> Thanks for creating this FLIP!
> I'm a flink user, and in our internal scenario we use the colocation
> technology to run flink jobs and online service on the same machine
> together. We found that flink jobs are occasionally affected by other
> non-flink jobs (i.e. if the host disk space is full, that will result in
> 'No space left on device' error on flink jobs). This flip will really help
> us to benefit from hybrid shuffle without being worried about insufficient
> disk space problem.
>
> And I also have a few questions.
> 1. If the same subpartition spans multiple different tiers, how to keep the
> order of segments between different storage tiers (if necessary)?
> 2. In the process of writing to the local disk for a subpartition, what
> will happen if the disk space is found to be full? Will it report an error
> or automatically transfer to remote storage?
> 3. For remote storage, I noticed that it uses direct reading, which is
> different from the other two, does the switching between different tiers
> will bring overhead or waiting? In addition, compared to flink rss, which
> optimizes data compression and small file merging to improve throughput and
> relieve file system pressure, does the object storage system can meet the
> performance requirements and concurrent access challenges of large-scale
> batch jobs(parallelism > 10000)?
>
> Thanks,
> Xia
>
> Zhu Zhu <reed...@gmail.com> 于2023年3月10日周五 16:44写道:
>
> > Hi Yuxin,
> >
> > Thanks for creating this FLIP!
> > The idea of tiered storage looks good. Instead of choosing one from
> > multiple storages, it can help to balance between performance, cost and
> > stability. It also has the potential to adaptively select proper tiers
> > according to more runtime information, to achieve better performance
> > and ease of use.
> >
> > I have a question about the tier finding of data reading. In the FLIP
> > it proposes that the Read Client asks each storage tier whether a
> > given segment exists in it, from higher priority tiers to lower priority
> > ones. I'm a bit concerned about the cost of it, especially when data
> > are written to low priority tiers. Do you have any evaluation of it?
> > Is it possible to let the Reader Client know the location of the next
> > segment when it has finished reading one segment? Or maybe just let it
> > know whether the next segment is located in the same tier, if we can
> > have the assumption that tier changing would not be very frequent.
> >
> > Thanks,
> > Zhu
> >
> > Weihua Hu <huweihua....@gmail.com> 于2023年3月10日周五 11:52写道:
> > >
> > > Thanks Yuxin for your explanation.
> > >
> > > That sounds reasonable. Looking forward to the new shuffle.
> > >
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Fri, Mar 10, 2023 at 11:48 AM Yuxin Tan <tanyuxinw...@gmail.com>
> > wrote:
> > >
> > > > Hi, Weihua,
> > > > Thanks for the questions and the ideas.
> > > >
> > > > > 1. How many performance regressions would there be if we only
> > > > used remote storage?
> > > >
> > > > The new architecture can support to use remote storage only, but this
> > > > FLIP target is to improve job stability. And the change in the FLIP
> has
> > > > been significantly complex and the goal of the first version is to
> > update
> > > > Hybrid Shuffle to the new architecture and support remote storage as
> > > > a supplement. The performance of this version is not the first
> > priority,
> > > > so we haven’t tested the performance of using only remote storage.
> > > > If there are indeed regressions, we will keep optimizing the
> > performance
> > > > of the remote storages and improve it until only remote storage is
> > > > available in the production environment.
> > > >
> > > > > 2. Shall we move the local data to remote storage if the producer
> is
> > > > finished for a long time?
> > > >
> > > > I agree that it is a good idea, which can release task manager
> > resources
> > > > more timely. But moving data from TM local disk to remote storage
> needs
> > > > more detailed discussion and design, and it is easier to implement it
> > based
> > > > on the new architecture. Considering the complexity, the target
> focus,
> > and
> > > > the iteration cycle of the FLIP, we decide that the details are not
> > > > included
> > > > in the first version. We will extend and implement them in the
> > subsequent
> > > > versions.
> > > >
> > > > Best,
> > > > Yuxin
> > > >
> > > >
> > > > Weihua Hu <huweihua....@gmail.com> 于2023年3月9日周四 11:22写道:
> > > >
> > > > > Hi, Yuxin
> > > > >
> > > > > Thanks for driving this FLIP.
> > > > >
> > > > > The remote storage shuffle could improve the stability of Batch
> jobs.
> > > > >
> > > > > In our internal scenario, we use a hybrid cluster to run both
> > > > > Streaming(high priority)
> > > > > and Batch jobs(low priority). When there is not enough
> > resources(such as
> > > > > cpu usage
> > > > > reaches a threshold), the batch containers will be evicted. So this
> > will
> > > > > cause some re-run
> > > > > of batch tasks.
> > > > >
> > > > > It would be a great help if the remote storage could address this.
> > So I
> > > > > have a few questions.
> > > > >
> > > > > 1. How many performance regressions would there be if we only used
> > remote
> > > > > storage?
> > > > >
> > > > > 2. In current design, the shuffle data segment will write to one
> > kind of
> > > > > storage tier.
> > > > > Shall we move the local data to remote storage if the producer is
> > > > finished
> > > > > for a long time?
> > > > > So we can release the idle task manager with no shuffle data on it.
> > This
> > > > > may help to reduce
> > > > > the resource usage when producer parallelism is larger than
> consume.
> > > > >
> > > > > Best,
> > > > > Weihua
> > > > >
> > > > >
> > > > > On Thu, Mar 9, 2023 at 10:38 AM Yuxin Tan <tanyuxinw...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi, Junrui,
> > > > > > Thanks for the suggestions and ideas.
> > > > > >
> > > > > > > If they are fixed, I suggest that FLIP could provide clearer
> > > > > > explanations.
> > > > > > I have updated the FLIP and described the segment size more
> > clearly.
> > > > > >
> > > > > > > can we provide configuration options for users to manually
> > adjust the
> > > > > > sizes?
> > > > > > The segment size can be configured if necessary. But considering
> > that
> > > > if
> > > > > we
> > > > > > exposed these parameters prematurely, it may be difficult to
> > modify the
> > > > > > implementation later because the user has already used the
> > configs. We
> > > > > > can make these internal configs or fixed values when implementing
> > the
> > > > > first
> > > > > > version, I think we can use either of these two ways, because
> they
> > are
> > > > > > internal and do not affect the public APIs.
> > > > > >
> > > > > > Best,
> > > > > > Yuxin
> > > > > >
> > > > > >
> > > > > > Junrui Lee <jrlee....@gmail.com> 于2023年3月8日周三 00:24写道:
> > > > > >
> > > > > > > Hi Yuxin,
> > > > > > >
> > > > > > > This FLIP looks quite reasonable. Flink can solve the problem
> of
> > > > Batch
> > > > > > > shuffle by
> > > > > > > combining local and remote storage, and can use fixed local
> > disks for
> > > > > > > better performance
> > > > > > >  in most scenarios, while using remote storage as a supplement
> > when
> > > > > local
> > > > > > > disks are not
> > > > > > >  sufficient, avoiding wasteful costs and poor job stability.
> > > > Moreover,
> > > > > > the
> > > > > > > solution also
> > > > > > > considers the issue of dynamic switching, which can
> automatically
> > > > > switch
> > > > > > to
> > > > > > > remote
> > > > > > > storage when the local disk is full, saving costs, and
> > automatically
> > > > > > switch
> > > > > > > back when
> > > > > > > there is available space on the local disk.
> > > > > > >
> > > > > > > As Wencong Liu stated, an appropriate segment size is
> essential,
> > as
> > > > it
> > > > > > can
> > > > > > > significantly
> > > > > > > affect shuffle performance. I also agree that the first version
> > > > should
> > > > > > > focus mainly on the
> > > > > > > design and implementation. However, I have a small question
> about
> > > > > FLIP. I
> > > > > > > did not see
> > > > > > > any information regarding the segment size of memory, local
> > disk, and
> > > > > > > remote storage
> > > > > > > in this FLIP. Are these three values fixed at present? If they
> > are
> > > > > > fixed, I
> > > > > > > suggest that FLIP
> > > > > > > could provide clearer explanations. Moreover, although a
> dynamic
> > > > > segment
> > > > > > > size
> > > > > > > mechanism is not necessary at the moment, can we provide
> > > > configuration
> > > > > > > options for users
> > > > > > >  to manually adjust these sizes? I think it might be useful.
> > > > > > >
> > > > > > > Best,
> > > > > > > Junrui.
> > > > > > >
> > > > > > > Yuxin Tan <tanyuxinw...@gmail.com> 于2023年3月7日周二 20:14写道:
> > > > > > >
> > > > > > > > Thanks for joining the discussion.
> > > > > > > >
> > > > > > > > @weijie guo
> > > > > > > > > 1. How to optimize the broadcast result partition?
> > > > > > > > For the partitions with multi-consumers, e.g., broadcast
> result
> > > > > > > partition,
> > > > > > > > partition reuse,
> > > > > > > > speculative, etc, the processing logic is the same as the
> > original
> > > > > > Hybrid
> > > > > > > > Shuffle, that is,
> > > > > > > > using the full spilling strategy. It indeed may reduce the
> > > > > opportunity
> > > > > > to
> > > > > > > > consume from
> > > > > > > > memory, but the PoC shows that it has no effect on the
> > performance
> > > > > > > > basically.
> > > > > > > >
> > > > > > > > > 2. Can the new proposal completely avoid this problem of
> > > > inaccurate
> > > > > > > > backlog
> > > > > > > > calculation?
> > > > > > > > Yes, this can avoid the problem completely. About the read
> > buffers,
> > > > > > the N
> > > > > > > > is to reserve
> > > > > > > > one exclusive buffer per channel, which is to avoid the
> > deadlock
> > > > > > because
> > > > > > > > the buffers
> > > > > > > > are acquired by some channels and other channels can not
> > request
> > > > any
> > > > > > > > buffers. But
> > > > > > > > the buffers except for the N can be floating (competing to
> > request
> > > > > the
> > > > > > > > buffers) by all
> > > > > > > > channels.
> > > > > > > >
> > > > > > > > @Wencong Liu
> > > > > > > > > Deciding the Segment size dynamically will be helpful.
> > > > > > > > I agree that it may be better if the segment size is
> > dynamically
> > > > > > decided,
> > > > > > > > but for simplifying
> > > > > > > > the implementation of the first version, we want to make
> this a
> > > > fixed
> > > > > > > value
> > > > > > > > for each tier.
> > > > > > > > In the future, this can be a good improvement if necessary.
> In
> > the
> > > > > > first
> > > > > > > > version, we will mainly
> > > > > > > > focus on the more important features, such as the tiered
> > storage
> > > > > > > > architecture, dynamic
> > > > > > > > switching tiers, supporting remote storage, memory
> management,
> > etc.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Yuxin
> > > > > > > >
> > > > > > > >
> > > > > > > > Wencong Liu <liuwencle...@163.com> 于2023年3月7日周二 16:48写道:
> > > > > > > >
> > > > > > > > > Hello Yuxin,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >     Thanks for your proposal! Adding remote storage
> > capability to
> > > > > > > Flink's
> > > > > > > > > Hybrid Shuffle is a significant improvement that addresses
> > the
> > > > > issue
> > > > > > of
> > > > > > > > > local disk storage limitations. This enhancement not only
> > ensures
> > > > > > > > > uninterrupted Shuffle, but also enables Flink to handle
> > larger
> > > > > > > workloads
> > > > > > > > > and more complex data processing tasks. With the ability to
> > > > > > seamlessly
> > > > > > > > > shift between local and remote storage, Flink's Hybrid
> > Shuffle
> > > > will
> > > > > > be
> > > > > > > > more
> > > > > > > > > versatile and scalable, making it an ideal choice for
> > > > organizations
> > > > > > > > looking
> > > > > > > > > to build distributed data processing applications with
> ease.
> > > > > > > > >     Besides, I've a small question about the size of
> Segment
> > in
> > > > > > > different
> > > > > > > > > storages. According to the FLIP, the size of Segment may be
> > fixed
> > > > > for
> > > > > > > > each
> > > > > > > > > Storage Tier, but I think the fixed size may affect the
> > shuffle
> > > > > > > > > performance. For example, smaller segment size will improve
> > the
> > > > > > > > utilization
> > > > > > > > > rate of Memory Storage Tier, but it may brings extra cost
> to
> > Disk
> > > > > > > Storage
> > > > > > > > > Tier or Remote Storage Tier. Deciding the size of Segment
> > > > dynamicly
> > > > > > > will
> > > > > > > > be
> > > > > > > > > helpful.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Wencong Liu
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > At 2023-03-06 13:51:21, "Yuxin Tan" <
> tanyuxinw...@gmail.com>
> > > > > wrote:
> > > > > > > > > >Hi everyone,
> > > > > > > > > >
> > > > > > > > > >I would like to start a discussion on FLIP-301: Hybrid
> > Shuffle
> > > > > > > supports
> > > > > > > > > >Remote Storage[1].
> > > > > > > > > >
> > > > > > > > > >In the cloud-native environment, it is difficult to
> > determine
> > > > the
> > > > > > > > > >appropriate
> > > > > > > > > >disk space for Batch shuffle, which will affect job
> > stability.
> > > > > > > > > >
> > > > > > > > > >This FLIP is to support Remote Storage for Hybrid Shuffle
> to
> > > > > improve
> > > > > > > the
> > > > > > > > > >Batch job stability in the cloud-native environment.
> > > > > > > > > >
> > > > > > > > > >The goals of this FLIP are as follows.
> > > > > > > > > >1. By default, use the local memory and disk to ensure
> high
> > > > > shuffle
> > > > > > > > > >performance if the local storage space is sufficient.
> > > > > > > > > >2. When the local storage space is insufficient, use
> remote
> > > > > storage
> > > > > > as
> > > > > > > > > >a supplement to avoid large-scale Batch job failure.
> > > > > > > > > >
> > > > > > > > > >Looking forward to hearing from you.
> > > > > > > > > >
> > > > > > > > > >[1]
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-301%3A+Hybrid+Shuffle+supports+Remote+Storage
> > > > > > > > > >
> > > > > > > > > >Best,
> > > > > > > > > >Yuxin
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>

Reply via email to