Hi, thanks for drafting the FLIP, Lijie and Zhu Zhu. It already looks pretty solid and it will be a really great improvement to the batch scheduling. I'd second to the Till's feedback, especially when it comes to the consistent behavior between different deployment types / schedulers.
What I'm bit unsure about is the naming here. The word *Adaptive* means something different in the streaming and batch scheduler: - For *streaming* it refers to the ability to adapt the job parallelism based on the resource availability. - For *batch* it refers to the ability to adapt the stage parallelism based on the output of the previous stage. Should this be a concern? Best, D. On Sun, Oct 31, 2021 at 8:21 AM Lijie Wang <wangdachui9...@gmail.com> wrote: > Hi, Till & Zhu > > Thanks for your feedback. Also thanks for your comments and suggestions > > on wiki, which are very helpful for perfecting the FLIP. > > > I also agree to provide our users with consistent and easy-to-understand > > deployment options. Regarding the three options proposed by Till, my > opinion > > is the same as Zhu's. In the first version, we can only support "option1", > and > then the > > "option2" and "option3" can be the future improvements. > > Regarding the side note to abstract subpartitions as splits, > > although it is not our original intention, I personally feel it's > meaningful. > > This is also helpful to users, users can use it to do some monitoring work, > > in order to get the progress of jobs in detail. > > Best, > > Lijie > > Zhu Zhu <reed...@gmail.com> 于2021年10月30日周六 下午3:47写道: > > > Hi Till, > > > > Thanks for the comments! > > > > I agree with you that we should avoid an auto-scaled job not able to be > > scheduled > > in standalone/reactive mode. And I think it's great if we can expose a > > deployment > > option that is consistent for streaming and batch jobs, which can be > easier > > to > > understand. Just looking to the day to make both adaptive schedulers > > default, so > > that most users do not need to care about job tuning while the job can > run > > well. > > > > Regarding the three options, personally I prefer to take *#1* as the > first > > step, to > > limit the scope of this FLIP a bit, otherwise it may be too complicated. > > I think *#3* is the final goal we need to target later, so that mixed > > bounded and > > unbounded workloads can be supported. Given that there can be multiple > > stages scheduled at the same time, the design of the scheduling may not > be > > very straightforward and needs some thorough consideration. > > *#2* can be a very good improvement itself. Shuffles of batch jobs can be > > auto-determined to be pipelined or blocking according to available > > resources. > > But the changes may involve many components and can be large. So I think > > it can be a standalone future improvement. > > > > Regarding the side note to abstract subpartitions as splits, the idea is > > very > > interesting to me. Besides supporting auto scaling, I think trackable > > produced > > splits can also help in troubleshooting and give some insights for future > > improvements. Collecting data sizes for batch adaptive scheduler can be > the > > first step and we can further consider the abstraction of it. > > > > Thanks, > > Zhu > > > > Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道: > > > > > Hi Lijie, > > > > > > Thanks for drafting this FLIP together with Zhu Zhu :-) > > > > > > I like the idea of making the parallelism of operators of a bounded job > > > dependent on the data size. This makes the job adjust automatically > when > > > the data sources/sizes change. > > > > > > I can see this work well in combination with the active mode where > Flink > > > can ask for more resources. > > > > > > In the case of the standalone mode, I think it can lead to situations > > where > > > one and the same job can be scheduled or not depending on the input > data. > > > The problem is pipelined regions that contain more than a single > operator > > > instance (e.g. pipelined shuffles). We already have this problem when > > > submitting a batch job with too high parallelism onto a standalone > > cluster. > > > However, with the adaptive batch mode this problem might become a bit > > more > > > present. So my question would be how can we solve this problem > > (potentially > > > in a follow up step). I could think of the following three alternatives > > > atm: > > > > > > 1. Only allow blocking data exchanges: This will limit the size of a > > > pipelined region to a single operator instance. This has the downside > > that > > > we no longer support pipelined execution of multiple operators (other > > than > > > chained). Moreover, it requires the user to set all data exchanges to > > > blocking which cannot be enforced atm. > > > 2. Introduce a new pipelined-blocking data exchange hybrid that > supports > > > pipelined data exchanges but can also spill to disk if there is no > > > consumer: This could allow to still make progress in case that one has > a > > > pipelined region which requires more slots than what we currently have. > > > 3. Decide on the actual parallelism of a pipelined region after having > > > received the slots that are declared based on the data size per > subtask. > > If > > > the pipelined region contains an all-to-all connection, then the > > > parallelism is how many slots we currently have. If not, then the > > > parallelism can be decided by the data volume: This would effectively > > mean > > > to enable the existing AdaptiveScheduler to also run batch workloads. > > > > > > With either of these options, I believe that we could provide a > somewhat > > > consistent behaviour across the different deployment and execution > modes > > > wrt to scaling: > > > > > > a) Active + streaming job that uses AdaptiveScheduler: Can run with > fewer > > > slots than requested. Can ask for more slots. Once new slots arrive it > > will > > > make use of it. > > > b) Reactive + streaming job that uses AdaptiveScheduler: Can run with > > fewer > > > slots than requested. Once new slots arrive it will make use of it. > > > c) Active + batch job that uses batch adaptive scheduler + any of 1., > 2. > > or > > > 3.: Can run with fewer slots than requested (because it can complete > the > > > job with a single slot). Can ask for more slots. Once new slots arrive > it > > > will make use of it. > > > b) Standalone + batch job that uses batch adaptive scheduler + any of > 1., > > > 2. or 3.: Can run with fewer slots than requested (because it can > > complete > > > the job with a single slot). Once new slots arrive it will make use of > it > > > (up to the desired maximum parallelism). > > > > > > If we decide to go with option 1. or 2., then we will only be able to > run > > > mixed workloads (mixture of bounded and unbounded sources) in streaming > > > mode. This might be ok for the time being. > > > > > > This actually leads to my main concern, which is to give our users > > > consistent and somewhat easy to understand deployment options. In order > > to > > > achieve this Flink should always be able to make progress unless the > > > parallelism is explicitly configured (e.g. a very high parallelism in a > > > pipelined region that cannot be fulfilled). Moreover, Flink should be > > able > > > to make use of new resources if the job isn't being run at the maximum > > > parallelism already. Removing slots so that the minimum number of > > required > > > slots is still available should also be possible. Maybe one idea could > be > > > to make the adaptive batch scheduler the default for batch jobs > > eventually. > > > For streaming jobs, we would ideally always use the AdaptiveScheduler > > > to give a consistent behaviour. > > > > > > As a side note: Creating as many subpartitions as the maximum > parallelism > > > is will result in a one-to-one mapping between sub partitions and key > > > groups. If we then also make the non keyed operators work on a set of > sub > > > partitions that store the operator state, then the sub partitions could > > be > > > seen as some logical work unit/split that is assigned to operators. > > Having > > > such an abstraction could allow us to track which work unit has > completed > > > which helps with rescaling of operators and maintaining order > guarantees, > > > for example. > > > > > > I also left some smaller comments in the wiki. > > > > > > Cheers, > > > Till > > > > > > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang <wangdachui9...@gmail.com> > > > wrote: > > > > > > > Hi all, > > > > > > > > > > > > Zhu Zhu and I propose to introduce a new job scheduler to Flink: > > adaptive > > > > batch job scheduler. The new scheduler can automatically decide > > > > parallelisms of job vertices for batch jobs, according to the size of > > > data > > > > volume each vertex needs to process. > > > > > > > > Major benefits of this scheduler includes: > > > > > > > > 1. Batch job users can be relieved from parallelism tuning > > > > 2. Automatically tuned parallelisms can be vertex level and can > > better > > > > fit consumed datasets which have a varying volume size every day > > > > > > > > > > > > 1. Vertices from SQL batch jobs can be assigned with different > > > > parallelisms which are automatically tuned > > > > 2. It can be the first step towards enabling auto-rebalancing > > > workloads > > > > of tasks > > > > > > > > You can find more details in the FLIP-187[1]. Looking forward to your > > > > feedback. > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler > > > > > > > > Best, > > > > > > > > Lijie > > > > > > > > > >