Hi Lijie,

Thanks for drafting this FLIP together with Zhu Zhu :-)

I like the idea of making the parallelism of operators of a bounded job
dependent on the data size. This makes the job adjust automatically when
the data sources/sizes change.

I can see this work well in combination with the active mode where Flink
can ask for more resources.

In the case of the standalone mode, I think it can lead to situations where
one and the same job can be scheduled or not depending on the input data.
The problem is pipelined regions that contain more than a single operator
instance (e.g. pipelined shuffles). We already have this problem when
submitting a batch job with too high parallelism onto a standalone cluster.
However, with the adaptive batch mode this problem might become a bit more
present. So my question would be how can we solve this problem (potentially
in a follow up step). I could think of the following three alternatives atm:

1. Only allow blocking data exchanges: This will limit the size of a
pipelined region to a single operator instance. This has the downside that
we no longer support pipelined execution of multiple operators (other than
chained). Moreover, it requires the user to set all data exchanges to
blocking which cannot be enforced atm.
2. Introduce a new pipelined-blocking data exchange hybrid that supports
pipelined data exchanges but can also spill to disk if there is no
consumer: This could allow to still make progress in case that one has a
pipelined region which requires more slots than what we currently have.
3. Decide on the actual parallelism of a pipelined region after having
received the slots that are declared based on the data size per subtask. If
the pipelined region contains an all-to-all connection, then the
parallelism is how many slots we currently have. If not, then the
parallelism can be decided by the data volume: This would effectively mean
to enable the existing AdaptiveScheduler to also run batch workloads.

With either of these options, I believe that we could provide a somewhat
consistent behaviour across the different deployment and execution modes
wrt to scaling:

a) Active + streaming job that uses AdaptiveScheduler: Can run with fewer
slots than requested. Can ask for more slots. Once new slots arrive it will
make use of it.
b) Reactive + streaming job that uses AdaptiveScheduler: Can run with fewer
slots than requested. Once new slots arrive it will make use of it.
c) Active + batch job that uses batch adaptive scheduler + any of 1., 2. or
3.: Can run with fewer slots than requested (because it can complete the
job with a single slot). Can ask for more slots. Once new slots arrive it
will make use of it.
b) Standalone + batch job that uses batch adaptive scheduler + any of 1.,
2. or 3.: Can run with fewer slots than requested (because it can complete
the job with a single slot). Once new slots arrive it will make use of it
(up to the desired maximum parallelism).

If we decide to go with option 1. or 2., then we will only be able to run
mixed workloads (mixture of bounded and unbounded sources) in streaming
mode. This might be ok for the time being.

This actually leads to my main concern, which is to give our users
consistent and somewhat easy to understand deployment options. In order to
achieve this Flink should always be able to make progress unless the
parallelism is explicitly configured (e.g. a very high parallelism in a
pipelined region that cannot be fulfilled). Moreover, Flink should be able
to make use of new resources if the job isn't being run at the maximum
parallelism already. Removing slots so that the minimum number of required
slots is still available should also be possible. Maybe one idea could be
to make the adaptive batch scheduler the default for batch jobs eventually.
For streaming jobs, we would ideally always use the AdaptiveScheduler
to give a consistent behaviour.

As a side note: Creating as many subpartitions as the maximum parallelism
is will result in a one-to-one mapping between sub partitions and key
groups. If we then also make the non keyed operators work on a set of sub
partitions that store the operator state, then the sub partitions could be
seen as some logical work unit/split that is assigned to operators. Having
such an abstraction could allow us to track which work unit has completed
which helps with rescaling of operators and maintaining order guarantees,
for example.

I also left some smaller comments in the wiki.

Cheers,
Till

On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang <wangdachui9...@gmail.com> wrote:

> Hi all,
>
>
> Zhu Zhu and I propose to introduce a new job scheduler to Flink: adaptive
> batch job scheduler. The new scheduler can automatically decide
> parallelisms of job vertices for batch jobs, according to the size of data
> volume each vertex needs to process.
>
> Major benefits of this scheduler includes:
>
>    1. Batch job users can be relieved from parallelism tuning
>    2. Automatically tuned parallelisms can be vertex level and can better
>    fit consumed datasets which have a varying volume size every day
>
>
>    1. Vertices from SQL batch jobs can be assigned with different
>    parallelisms which are automatically tuned
>    2. It can be the first step towards enabling auto-rebalancing workloads
>    of tasks
>
> You can find more details in the FLIP-187[1]. Looking forward to your
> feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
>
> Best,
>
> Lijie
>

Reply via email to