Hi, Till & Zhu

Thanks for your feedback. Also thanks for your comments and suggestions

on wiki, which are very helpful for perfecting the FLIP.


I also agree to provide our users with consistent and easy-to-understand

deployment options. Regarding the three options proposed by Till, my
opinion

is the same as Zhu's. In the first version, we can only support "option1", and
then the

"option2" and "option3" can be the future improvements.

Regarding the side note to abstract subpartitions as splits,

although it is not our original intention, I personally feel it's
meaningful.

This is also helpful to users, users can use it to do some monitoring work,

in order to get the progress of jobs in detail.

Best,

Lijie

Zhu Zhu <reed...@gmail.com> 于2021年10月30日周六 下午3:47写道:

> Hi Till,
>
> Thanks for the comments!
>
> I agree with you that we should avoid an auto-scaled job not able to be
> scheduled
> in standalone/reactive mode. And I think it's great if we can expose a
> deployment
> option that is consistent for streaming and batch jobs, which can be easier
> to
> understand. Just looking to the day to make both adaptive schedulers
> default, so
> that most users do not need to care about job tuning while the job can run
> well.
>
> Regarding the three options, personally I prefer to take *#1* as the first
> step, to
> limit the scope of this FLIP a bit, otherwise it may be too complicated.
> I think *#3* is the final goal we need to target later, so that mixed
> bounded and
> unbounded workloads can be supported. Given that there can be multiple
> stages scheduled at the same time, the design of the scheduling may not be
> very straightforward and needs some thorough consideration.
> *#2* can be a very good improvement itself. Shuffles of batch jobs can be
> auto-determined to be pipelined or blocking according to available
> resources.
> But the changes may involve many components and can be large. So I think
> it can be a standalone future improvement.
>
> Regarding the side note to abstract subpartitions as splits, the idea is
> very
> interesting to me. Besides supporting auto scaling, I think trackable
> produced
> splits can also help in troubleshooting and give some insights for future
> improvements. Collecting data sizes for batch adaptive scheduler can be the
> first step and we can further consider the abstraction of it.
>
> Thanks,
> Zhu
>
> Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道:
>
> > Hi Lijie,
> >
> > Thanks for drafting this FLIP together with Zhu Zhu :-)
> >
> > I like the idea of making the parallelism of operators of a bounded job
> > dependent on the data size. This makes the job adjust automatically when
> > the data sources/sizes change.
> >
> > I can see this work well in combination with the active mode where Flink
> > can ask for more resources.
> >
> > In the case of the standalone mode, I think it can lead to situations
> where
> > one and the same job can be scheduled or not depending on the input data.
> > The problem is pipelined regions that contain more than a single operator
> > instance (e.g. pipelined shuffles). We already have this problem when
> > submitting a batch job with too high parallelism onto a standalone
> cluster.
> > However, with the adaptive batch mode this problem might become a bit
> more
> > present. So my question would be how can we solve this problem
> (potentially
> > in a follow up step). I could think of the following three alternatives
> > atm:
> >
> > 1. Only allow blocking data exchanges: This will limit the size of a
> > pipelined region to a single operator instance. This has the downside
> that
> > we no longer support pipelined execution of multiple operators (other
> than
> > chained). Moreover, it requires the user to set all data exchanges to
> > blocking which cannot be enforced atm.
> > 2. Introduce a new pipelined-blocking data exchange hybrid that supports
> > pipelined data exchanges but can also spill to disk if there is no
> > consumer: This could allow to still make progress in case that one has a
> > pipelined region which requires more slots than what we currently have.
> > 3. Decide on the actual parallelism of a pipelined region after having
> > received the slots that are declared based on the data size per subtask.
> If
> > the pipelined region contains an all-to-all connection, then the
> > parallelism is how many slots we currently have. If not, then the
> > parallelism can be decided by the data volume: This would effectively
> mean
> > to enable the existing AdaptiveScheduler to also run batch workloads.
> >
> > With either of these options, I believe that we could provide a somewhat
> > consistent behaviour across the different deployment and execution modes
> > wrt to scaling:
> >
> > a) Active + streaming job that uses AdaptiveScheduler: Can run with fewer
> > slots than requested. Can ask for more slots. Once new slots arrive it
> will
> > make use of it.
> > b) Reactive + streaming job that uses AdaptiveScheduler: Can run with
> fewer
> > slots than requested. Once new slots arrive it will make use of it.
> > c) Active + batch job that uses batch adaptive scheduler + any of 1., 2.
> or
> > 3.: Can run with fewer slots than requested (because it can complete the
> > job with a single slot). Can ask for more slots. Once new slots arrive it
> > will make use of it.
> > b) Standalone + batch job that uses batch adaptive scheduler + any of 1.,
> > 2. or 3.: Can run with fewer slots than requested (because it can
> complete
> > the job with a single slot). Once new slots arrive it will make use of it
> > (up to the desired maximum parallelism).
> >
> > If we decide to go with option 1. or 2., then we will only be able to run
> > mixed workloads (mixture of bounded and unbounded sources) in streaming
> > mode. This might be ok for the time being.
> >
> > This actually leads to my main concern, which is to give our users
> > consistent and somewhat easy to understand deployment options. In order
> to
> > achieve this Flink should always be able to make progress unless the
> > parallelism is explicitly configured (e.g. a very high parallelism in a
> > pipelined region that cannot be fulfilled). Moreover, Flink should be
> able
> > to make use of new resources if the job isn't being run at the maximum
> > parallelism already. Removing slots so that the minimum number of
> required
> > slots is still available should also be possible. Maybe one idea could be
> > to make the adaptive batch scheduler the default for batch jobs
> eventually.
> > For streaming jobs, we would ideally always use the AdaptiveScheduler
> > to give a consistent behaviour.
> >
> > As a side note: Creating as many subpartitions as the maximum parallelism
> > is will result in a one-to-one mapping between sub partitions and key
> > groups. If we then also make the non keyed operators work on a set of sub
> > partitions that store the operator state, then the sub partitions could
> be
> > seen as some logical work unit/split that is assigned to operators.
> Having
> > such an abstraction could allow us to track which work unit has completed
> > which helps with rescaling of operators and maintaining order guarantees,
> > for example.
> >
> > I also left some smaller comments in the wiki.
> >
> > Cheers,
> > Till
> >
> > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang <wangdachui9...@gmail.com>
> > wrote:
> >
> > > Hi all,
> > >
> > >
> > > Zhu Zhu and I propose to introduce a new job scheduler to Flink:
> adaptive
> > > batch job scheduler. The new scheduler can automatically decide
> > > parallelisms of job vertices for batch jobs, according to the size of
> > data
> > > volume each vertex needs to process.
> > >
> > > Major benefits of this scheduler includes:
> > >
> > >    1. Batch job users can be relieved from parallelism tuning
> > >    2. Automatically tuned parallelisms can be vertex level and can
> better
> > >    fit consumed datasets which have a varying volume size every day
> > >
> > >
> > >    1. Vertices from SQL batch jobs can be assigned with different
> > >    parallelisms which are automatically tuned
> > >    2. It can be the first step towards enabling auto-rebalancing
> > workloads
> > >    of tasks
> > >
> > > You can find more details in the FLIP-187[1]. Looking forward to your
> > > feedback.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> > >
> > > Best,
> > >
> > > Lijie
> > >
> >
>

Reply via email to