Will this also allow spilling everything to disk while also forwarding data to the next task?

This would allow us to improve fine-grained recovery by no longer being constrained to pipelined regions.

On 25/05/2022 05:55, weijie guo wrote:
Hi All,
Thank you for your attention and feedback.
Do you have any other comments? If there are no other questions, I'll vote
on FLIP-235 tomorrow.

Best regards,

Weijie


Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道:

Hi Xintong
     Thanks for your detailed explanation, I misunderstand the spill
behavior at first glance,
I get your point now. I think it will be a good addition to the current
execution mode.
Looking forward to it :)

Best,
Aitozi

Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道:

Hi Aitozi,

In which case we can use the hybrid shuffle mode

Just to directly answer this question, in addition to
Weijie's explanations. For batch workload, if you want the workload to
take
advantage of as many resources as available, which ranges from a single
slot to as many slots as the total tasks, you may consider hybrid shuffle
mode. Admittedly, this may not always be wanted, e.g., users may not want
to execute a job if there's too few resources available, or may not want
a
job taking too many of the cluster resources. That's why we propose
hybrid
shuffle as an additional option for batch users, rather than a
replacement
for Pipelined or Blocking mode.

So you mean the hybrid shuffle mode will limit its usage to the bounded
source, Right ?

Yes.

One more question, with the bounded data and partly of the stage is
running
in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the checkpoint enabled for these running stages or will it
re-run after the failure?

There's no checkpoints. The failover behavior depends on the spilling
strategy.
- In the first version, we only consider a selective spilling strategy,
which means spill data as little as possible to the disk, which means in
case of failover upstream tasks need to be restarted to reproduce the
complete intermediate results.
- An alternative strategy we may introduce in future if needed is to
spill
the complete intermediate results. That avoids restarting upstream tasks
in
case of failover, because the produced intermediate results can be
re-consumed, at the cost of more disk IO load.
With both strategies, the trade-off between failover cost and IO load is
for the user to decide. This is also discussed in the MemoryDataManager
section of the FLIP.

Best,

Xintong



On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> wrote:

Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
limit
its usage to the bounded source, Right ?
One more question, with the bounded data and partly of the stage is
running
in the Pipelined shuffle mode, what will be the behavior of the task
failure, Is the
checkpoint enabled for these running stages or will it re-run after the
failure?

Best,
Aitozi

weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道:

Hi, Aitozi:

Thank you for the feedback!
Here are some of my thoughts on your question

1.If there is an unbounded data source, but only have resource to
schedule the first stage, will it bring the big burden to the
disk/shuffle
service which will occupy all the resource I think.
First of all, Hybrid Shuffle Mode is oriented to the batch job
scenario,
so
there is no problem of unbounded data sources. Secondly, if you
consider
the stream scenario, I think Pipelined Shuffle should still be the
best
choice at present. For an unbounded data stream, it is not meaningful
to
only run some stages.

2. Which kind of job will benefit from the hybrid shuffle mode.
In
other words, In which case we can use the hybrid shuffle mode:
Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
shuffle mode can effectively utilize cluster resources and avoid some
unnecessary disk IO overhead. For OLAP scenarios, which are
characterized
by a large number of concurrently submitted short batch jobs, hybrid
shuffle can solve the scheduling deadlock problem of pipelined
shuffle
and
achieve similar performance.

Best regards,

Weijie


Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:

Hi Weijie:

      Thanks for the nice FLIP, I have couple questions about this:

1) In the hybrid shuffle mode, the shuffle mode is decided by the
resource.
If there
is an unbounded data source, but only have resource to schedule the
first
stage, will it
bring the big burden to the disk/shuffle service which will occupy
all
the
resource I think.

2) Which kind of job will benefit from the hybrid shuffle mode. In
other
words, In which
case we can use the hybrid shuffle mode:
- For batch job want to use more resource to reduce the e2e time ?
- Or for streaming job which may lack of resource temporarily ?
- Or for OLAP job which will try to make best use of available
resources
as
you mentioned to finish the query?
Just want to know the typical use case for the Hybrid shuffle mode
:)

Best,
Aitozi.

weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道:

Yangze, Thank you for the feedback!
Here's my thoughts for your questions:

How do we decide the size of the buffer pool in
MemoryDataManager
and
the read buffers in FileDataManager?
The BufferPool in MemoryDataManager is the LocalBufferPool used
by
ResultPartition, and the size is the same as the current
implementation
of
sort-merge shuffle. In other words, the minimum value of
BufferPool
is
a
configurable fixed value, and the maximum value is Math.max(min,
4
*
numSubpartitions). The default value can be determined by running
the
TPC-DS tests.
Read buffers in FileDataManager are requested from the
BatchShuffleReadBufferPool shared by TaskManager, it's size
controlled
by
*taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
default
value is 32M, which is consistent with the current sort-merge
shuffle
logic.

Is there an upper limit for the sum of them? If there is, how
does
MemoryDataManager and FileDataManager sync the memory usage?
The buffers of the MemoryDataManager are limited by the size of
the
LocalBufferPool, and the upper limit is the size of the Network
Memory.
The
buffers of the FileDataManager are directly requested from
UnpooledOffHeapMemory, and are also limited by the size of the
framework
off-heap memory. I think there should be no need for additional
synchronization mechanisms.

How do you disable the slot sharing? If user configures both
the
slot
sharing group and hybrid shuffle, what will happen to that job?
I think we can print a warning log when Hybrid Shuffle is enabled
and
SSG
is configured during the JobGraph compilation stage, and fallback
to
the
region slot sharing group by default. Of course, it will be
emphasized
in
the document that we do not currently support SSG, If configured,
it
will
fall back to the default.


Best regards,

Weijie


Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道:

Thanks for driving this. Xintong and Weijie.

I believe this feature will make Flink a better batch/OLAP
engine.
+1
for the overall design.

Some questions:
1. How do we decide the size of the buffer pool in
MemoryDataManager
and the read buffers in FileDataManager?
2. Is there an upper limit for the sum of them? If there is,
how
does
MemoryDataManager and FileDataManager sync the memory usage?
3. How do you disable the slot sharing? If user configures both
the
slot sharing group and hybrid shuffle, what will happen to that
job?

Best,
Yangze Guo

On Thu, May 19, 2022 at 2:41 PM Xintong Song <
tonysong...@gmail.com>
wrote:
Thanks for preparing this FLIP, Weijie.

I think this is a good improvement on batch resource
elasticity.
Looking
forward to the community feedback.

Best,

Xintong



On Thu, May 19, 2022 at 2:31 PM weijie guo <
guoweijieres...@gmail.com>
wrote:

Hi all,


I’d like to start a discussion about FLIP-235[1], which
introduce a
new shuffle mode
  can overcome some of the problems of Pipelined Shuffle and
Blocking
Shuffle in batch scenarios.

Currently in Flink, task scheduling is more or less
constrained
by
the
shuffle implementations.
This will bring the following disadvantages:

    1. Pipelined Shuffle:
     For pipelined shuffle, the upstream and downstream
tasks
are
required to be deployed at the same time, to avoid upstream
tasks
being
blocked forever. This is fine when there are enough resources
for
both
upstream and downstream tasks to run simultaneously, but will
cause
the
following problems otherwise:
    1.
       Pipelined shuffle connected tasks (i.e., a pipelined
region)
cannot be executed until obtaining resources for all of them,
resulting
in
longer job finishing time and poorer resource efficiency due to
holding
part of the resources idle while waiting for the rest.
       2.
       More severely, if multiple jobs each hold part of the
cluster
resources and are waiting for more, a deadlock would occur. The
chance
is
not trivial, especially for scenarios such as OLAP where
concurrent
job
submissions are frequent.
       2. Blocking Shuffle:
     For blocking shuffle, execution of downstream tasks
must
wait
for
all upstream tasks to finish, despite there might be more
resources
available. The sequential execution of upstream and downstream
tasks
significantly increase the job finishing time, and the disk IO
workload
for
spilling and loading full intermediate data also affects the
performance.

We believe the root cause of the above problems is that
shuffle
implementations put unnecessary constraints on task scheduling.

To solve this problem, Xintong Song and I propose to
introduce
hybrid
shuffle to minimize the scheduling constraints. With Hybrid
Shuffle,
Flink
should:
    1. Make best use of available resources.
     Ideally, we want Flink to always make progress if
possible.
That
is to say, it should always execute a pending task if there are
resources
available for that task.
    2. Minimize disk IO load.
     In-flight data should be consumed directly from memory
as
much
as
possible. Only data that is not consumed timely should be
spilled
to
disk.
You can find more details in FLIP-235. Looking forward to
your
feedback.

[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode


Best regards,

Weijie


Reply via email to