Timo Walther created FLINK-23470:
------------------------------------

             Summary: Use blocking shuffles but pipeline within a slot
                 Key: FLINK-23470
                 URL: https://issues.apache.org/jira/browse/FLINK-23470
             Project: Flink
          Issue Type: New Feature
          Components: API / DataStream
            Reporter: Timo Walther


As discussed in FLINK-23402, we would like to introduce a good default shuffle 
mode for batch runtime mode that is a trade-off between all pipelined and all 
blocking shuffles.

>From the discussion in FLINK-23402:

For the shuffle modes, I think those three settings are actually sufficient.:

1. pipeline all, for batch execution that wants pipelined shuffles. (Still 
batch recovery, no checkpoints, batch operators)
2. batch all, just in case you want to.
3. batch shuffles, pipeline within a slot. (DEFAULT)

This should be the default, and it means we batch whenever a slot has a 
dependency on another slot.

A dependency between slots is:

- any all-to-all connection (keyBy, broadcast, rebalance, random)
- any pointwise connection (rescale)
- any forward between different slot sharing groups
Effectively only FORWARD connections within the same slot sharing group has no 
dependency on another slot.

That mode makes a lot of sense as the default, because it guarantees that we 
can always run the program as long as we have at least one slot. No resource 
starvation ever. But it retains pipelining where we don't chain operators due 
to missing chaining logic (but we still slot-share them).

Compared to this (3) mode, FORWARD_EDGES_PIPELINED and 
POINTWISE_EDGES_PIPELINED are not well-defined.

POINTWISE_EDGES_PIPELINED is a gamble, it only works if you have a certain 
amount of resources, related to the rescale factor. Otherwise the job may fail 
with resource starvation. Hard to understand and debug for users; not a great 
option in my opinion.

FORWARD_EDGES_PIPELINED can also lead to job failure with resource starvation 
when the forward connection connects different slot sharing groups.
That's why I would drop those (they make it confusing for users) not reuse the 
GlobalDataExchangeMode, and rather introduce the option (3) above, which mostly 
batches the exchanges, except when then they are guaranteed to be in the same 
slot.

As a side note: The difference between (3) and (2) should be already relatively 
small in SQL jobs and become smaller over time, as more and more can be chained 
together.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to