Hi, thanks for drafting the FLIP, Lijie and Zhu Zhu. It already looks
pretty solid and it will be a really great improvement to the batch
scheduling. I'd second to the Till's feedback, especially when it comes to
the consistent behavior between different deployment types / schedulers.

What I'm bit unsure about is the naming here. The word *Adaptive* means
something different in the streaming and batch scheduler:
- For *streaming* it refers to the ability to adapt the job parallelism
based on the resource availability.
- For *batch* it refers to the ability to adapt the stage parallelism based
on the output of the previous stage.

Should this be a concern?

Best,
D.



On Sun, Oct 31, 2021 at 8:21 AM Lijie Wang <wangdachui9...@gmail.com> wrote:

> Hi, Till & Zhu
>
> Thanks for your feedback. Also thanks for your comments and suggestions
>
> on wiki, which are very helpful for perfecting the FLIP.
>
>
> I also agree to provide our users with consistent and easy-to-understand
>
> deployment options. Regarding the three options proposed by Till, my
> opinion
>
> is the same as Zhu's. In the first version, we can only support "option1",
> and
> then the
>
> "option2" and "option3" can be the future improvements.
>
> Regarding the side note to abstract subpartitions as splits,
>
> although it is not our original intention, I personally feel it's
> meaningful.
>
> This is also helpful to users, users can use it to do some monitoring work,
>
> in order to get the progress of jobs in detail.
>
> Best,
>
> Lijie
>
> Zhu Zhu <reed...@gmail.com> 于2021年10月30日周六 下午3:47写道:
>
> > Hi Till,
> >
> > Thanks for the comments!
> >
> > I agree with you that we should avoid an auto-scaled job not able to be
> > scheduled
> > in standalone/reactive mode. And I think it's great if we can expose a
> > deployment
> > option that is consistent for streaming and batch jobs, which can be
> easier
> > to
> > understand. Just looking to the day to make both adaptive schedulers
> > default, so
> > that most users do not need to care about job tuning while the job can
> run
> > well.
> >
> > Regarding the three options, personally I prefer to take *#1* as the
> first
> > step, to
> > limit the scope of this FLIP a bit, otherwise it may be too complicated.
> > I think *#3* is the final goal we need to target later, so that mixed
> > bounded and
> > unbounded workloads can be supported. Given that there can be multiple
> > stages scheduled at the same time, the design of the scheduling may not
> be
> > very straightforward and needs some thorough consideration.
> > *#2* can be a very good improvement itself. Shuffles of batch jobs can be
> > auto-determined to be pipelined or blocking according to available
> > resources.
> > But the changes may involve many components and can be large. So I think
> > it can be a standalone future improvement.
> >
> > Regarding the side note to abstract subpartitions as splits, the idea is
> > very
> > interesting to me. Besides supporting auto scaling, I think trackable
> > produced
> > splits can also help in troubleshooting and give some insights for future
> > improvements. Collecting data sizes for batch adaptive scheduler can be
> the
> > first step and we can further consider the abstraction of it.
> >
> > Thanks,
> > Zhu
> >
> > Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道:
> >
> > > Hi Lijie,
> > >
> > > Thanks for drafting this FLIP together with Zhu Zhu :-)
> > >
> > > I like the idea of making the parallelism of operators of a bounded job
> > > dependent on the data size. This makes the job adjust automatically
> when
> > > the data sources/sizes change.
> > >
> > > I can see this work well in combination with the active mode where
> Flink
> > > can ask for more resources.
> > >
> > > In the case of the standalone mode, I think it can lead to situations
> > where
> > > one and the same job can be scheduled or not depending on the input
> data.
> > > The problem is pipelined regions that contain more than a single
> operator
> > > instance (e.g. pipelined shuffles). We already have this problem when
> > > submitting a batch job with too high parallelism onto a standalone
> > cluster.
> > > However, with the adaptive batch mode this problem might become a bit
> > more
> > > present. So my question would be how can we solve this problem
> > (potentially
> > > in a follow up step). I could think of the following three alternatives
> > > atm:
> > >
> > > 1. Only allow blocking data exchanges: This will limit the size of a
> > > pipelined region to a single operator instance. This has the downside
> > that
> > > we no longer support pipelined execution of multiple operators (other
> > than
> > > chained). Moreover, it requires the user to set all data exchanges to
> > > blocking which cannot be enforced atm.
> > > 2. Introduce a new pipelined-blocking data exchange hybrid that
> supports
> > > pipelined data exchanges but can also spill to disk if there is no
> > > consumer: This could allow to still make progress in case that one has
> a
> > > pipelined region which requires more slots than what we currently have.
> > > 3. Decide on the actual parallelism of a pipelined region after having
> > > received the slots that are declared based on the data size per
> subtask.
> > If
> > > the pipelined region contains an all-to-all connection, then the
> > > parallelism is how many slots we currently have. If not, then the
> > > parallelism can be decided by the data volume: This would effectively
> > mean
> > > to enable the existing AdaptiveScheduler to also run batch workloads.
> > >
> > > With either of these options, I believe that we could provide a
> somewhat
> > > consistent behaviour across the different deployment and execution
> modes
> > > wrt to scaling:
> > >
> > > a) Active + streaming job that uses AdaptiveScheduler: Can run with
> fewer
> > > slots than requested. Can ask for more slots. Once new slots arrive it
> > will
> > > make use of it.
> > > b) Reactive + streaming job that uses AdaptiveScheduler: Can run with
> > fewer
> > > slots than requested. Once new slots arrive it will make use of it.
> > > c) Active + batch job that uses batch adaptive scheduler + any of 1.,
> 2.
> > or
> > > 3.: Can run with fewer slots than requested (because it can complete
> the
> > > job with a single slot). Can ask for more slots. Once new slots arrive
> it
> > > will make use of it.
> > > b) Standalone + batch job that uses batch adaptive scheduler + any of
> 1.,
> > > 2. or 3.: Can run with fewer slots than requested (because it can
> > complete
> > > the job with a single slot). Once new slots arrive it will make use of
> it
> > > (up to the desired maximum parallelism).
> > >
> > > If we decide to go with option 1. or 2., then we will only be able to
> run
> > > mixed workloads (mixture of bounded and unbounded sources) in streaming
> > > mode. This might be ok for the time being.
> > >
> > > This actually leads to my main concern, which is to give our users
> > > consistent and somewhat easy to understand deployment options. In order
> > to
> > > achieve this Flink should always be able to make progress unless the
> > > parallelism is explicitly configured (e.g. a very high parallelism in a
> > > pipelined region that cannot be fulfilled). Moreover, Flink should be
> > able
> > > to make use of new resources if the job isn't being run at the maximum
> > > parallelism already. Removing slots so that the minimum number of
> > required
> > > slots is still available should also be possible. Maybe one idea could
> be
> > > to make the adaptive batch scheduler the default for batch jobs
> > eventually.
> > > For streaming jobs, we would ideally always use the AdaptiveScheduler
> > > to give a consistent behaviour.
> > >
> > > As a side note: Creating as many subpartitions as the maximum
> parallelism
> > > is will result in a one-to-one mapping between sub partitions and key
> > > groups. If we then also make the non keyed operators work on a set of
> sub
> > > partitions that store the operator state, then the sub partitions could
> > be
> > > seen as some logical work unit/split that is assigned to operators.
> > Having
> > > such an abstraction could allow us to track which work unit has
> completed
> > > which helps with rescaling of operators and maintaining order
> guarantees,
> > > for example.
> > >
> > > I also left some smaller comments in the wiki.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang <wangdachui9...@gmail.com>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > >
> > > > Zhu Zhu and I propose to introduce a new job scheduler to Flink:
> > adaptive
> > > > batch job scheduler. The new scheduler can automatically decide
> > > > parallelisms of job vertices for batch jobs, according to the size of
> > > data
> > > > volume each vertex needs to process.
> > > >
> > > > Major benefits of this scheduler includes:
> > > >
> > > >    1. Batch job users can be relieved from parallelism tuning
> > > >    2. Automatically tuned parallelisms can be vertex level and can
> > better
> > > >    fit consumed datasets which have a varying volume size every day
> > > >
> > > >
> > > >    1. Vertices from SQL batch jobs can be assigned with different
> > > >    parallelisms which are automatically tuned
> > > >    2. It can be the first step towards enabling auto-rebalancing
> > > workloads
> > > >    of tasks
> > > >
> > > > You can find more details in the FLIP-187[1]. Looking forward to your
> > > > feedback.
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> > > >
> > > > Best,
> > > >
> > > > Lijie
> > > >
> > >
> >
>

Reply via email to