The proposal reads "Looking at the original AIP-15 the author proposes to use locking to enable the use of multiple schedulers, this might introduce unnecessary complexity"
To me introducing multiple roles (master scheduler + scheduler minions), may be actually more complex than just having "shared nothing" schedulers with locking. The former is also less scalable (whatever workload is done on that master [say serialization] can hit scale issues) and is less HA (as it relies on the orchestrator [k8s] for HA). My personal incline has always been going towards renaming the scheduler to "supervisor" (as it already does significantly more than just triggering tasks) and allowing many instances of that role, and using locks where necessary. That way there are just 2 roles in the cluster: supervisor and worker processes. Depending on the executor (say for k8s) you don't even need actual persistent worker processes. Max On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof <pjrvant...@gmail.com> wrote: > Hi all, > > I think that scheduling locking is maybe not the best way in solving this > issue. Still I’m in support of taking a good look at the scheduler because > it has some real scaling issues. > > I did wrote an alternative proposal to solve the scalability of the > scheduler: > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler > < > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler > > > > Any input on this is welcome. > > Gr, > Peter > > > > On 3 Mar 2019, at 03:26, Deng Xiaodong <xd.den...@gmail.com> wrote: > > > > Thanks Max. > > > > I have documented all the discussions around this topic & useful inputs > into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling > Performance) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651 > < > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>. > > > > > More inputs from folks are welcomed. > > > > Thanks. > > > > > > XD > > > >> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin < > maximebeauche...@gmail.com> wrote: > >> > >> Personally I'd vote against the idea of having certain scheduler > handling a > >> subset of the DAGs, that's just not HA. > >> > >> Also if you are in an env where you have a small number of large DAGs, > the > >> odds of having wasted work and double-firing get pretty high. > >> > >> With the lock in place, it's just a matter of the scheduler loop to > select > >> (in a db transaction) the dag that's not been processed for the longest > >> time that is not locked. Flipping the lock flag to true should be part > of > >> the db transaction. We probably need a btree index on lock and last > >> processed time. > >> > >> This way adding scheduler processes increases the scheduling pace, and > >> provides an HA solution. No leader / master / slave or election process, > >> just equal workers that work together. > >> > >> Max > >> > >> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com> > wrote: > >> > >>> Get your point and agree. And the suggestion you gave lastly to random > >>> sort DAGs is a great idea to address it. Thanks! > >>> > >>> XD > >>> > >>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com> > >>> wrote: > >>>> > >>>> I think that the probability calculation holds only if there is no > >>>> correlation between different schedulers. I think however there might > be > >>> an > >>>> accidental correlation if you think about typical deployments. > >>>> > >>>> Some details why I think accidental correlation is possible and even > >>>> likely. Assume that: > >>>> > >>>> - we have similar and similarly busy machines running schedulers > >>> (likely) > >>>> - time is synchronised between the machines (likely) > >>>> - the machines have the same DAG folders mounted (or copied) and the > >>>> same filesystem is used (this is exactly what multiple schedulers > >>>> deployment is all about) > >>>> - the schedulers start scanning at exactly the same time (crossing > 0:00 > >>>> second every full five minutes for example) - this I am not sure but > I > >>>> imagine this might be "typical" behaviour. > >>>> - they process list of DAGs in exactly the same sequence (it looks > like > >>>> this is the case dag_processing > >>>> < > >>> > https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300 > >>>> > >>>> and models/__init__ > >>>> < > >>> > https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567 > >>>> : > >>>> we use os.walk which uses os.listdir for which sequence of processing > >>>> depends on the filesystem implementation > >>>> < > >>> > https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic> > >>>> and > >>>> then we append files to the list) > >>>> > >>>> Then it's rather likely that the schedulers will be competing about > the > >>>> very same DAGs at the very beginning. Locking will change how quickly > >>> they > >>>> process each DAG of course, but If the DAGs are of similar sizes it's > >>> also > >>>> likely that the speed of scanning (DAGS/s) is similar for all > schedulers. > >>>> The schedulers will then catch-up with each other and might pretty > much > >>>> continuously compete for the same DAGs almost all the time. > >>>> > >>>> It can be mitigated super-easily by random sorting of the DAGs folder > >>> list > >>>> after it is prepared (it's file-system dependent now so we do not > rely on > >>>> particular order) . Then the probability numbers will hold perfectly I > >>>> think :) > >>>> > >>>> J. > >>>> > >>>> > >>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com> > >>> wrote: > >>>> > >>>>> I’m thinking of which architecture would be ideal. > >>>>> > >>>>> > >>>>> # Option-1: > >>>>> The master-slave architecture would be one option. But > leader-selection > >>>>> will be very essential to consider, otherwise we have issue in terms > of > >>> HA > >>>>> again. > >>>>> > >>>>> > >>>>> # Option-2: > >>>>> Another option we may consider is to simply start multiple scheduler > >>>>> instances (just using the current implementation, after modify & > >>> validate > >>>>> the scheduler_lock on DagModel). > >>>>> > >>>>> - In this case, given we handle everything properly using locking, we > >>>>> don’t need to worry too much about double-scheduling/triggering. > >>>>> > >>>>> - Another potential concern I had earlier is that different > schedulers > >>> may > >>>>> compete with each other and cause “waste” of scheduler resource. > >>>>> After further thinking, I realise this is a typical Birthday Problem. > >>>>> Given we have m DAGs, and n schedulers, at any moment, the > probability > >>>>> that all schedulers are working on different DAGs is m!/((m-n)! * > >>> (m^n)), > >>>>> and the probability that there are schedulers competing on the same > DAG > >>>>> will be 1-m!/((m-n)! * (m^n)). > >>>>> > >>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, > the > >>>>> probability that there is schedulers competing on the same DAG is > only > >>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is > only > >>>>> 0.33%. > >>>>> (This probability will be higher if m/n is low. But users should not > >>> start > >>>>> too many schedulers if they don’t have that many DAGs). > >>>>> > >>>>> Given the probability of schedulers competing is so low, my concern > on > >>>>> scheduler resource waste is not really valid. > >>>>> > >>>>> > >>>>> > >>>>> Based on these calculations/assessment, I think we can go for > option-2, > >>>>> i.e. we don’t make big change in the current implementation. > Instead, we > >>>>> ensure the scheduler_lock is working well and test intensively on > >>> running > >>>>> multiple schedulers. Then we should be good to let users know that > it’s > >>>>> safe to run multiple schedulers. > >>>>> > >>>>> Please share your thoughts on this and correct me if I’m wrong in any > >>>>> point above. Thanks. > >>>>> > >>>>> > >>>>> XD > >>>>> > >>>>> > >>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem < > >>>>> https://en.wikipedia.org/wiki/Birthday_problem> > >>>>> > >>>>> > >>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote: > >>>>>> > >>>>>> Does the proposal use master-slave architecture(leader scheduler vs > >>> slave > >>>>>> scheduler)? > >>>>>> > >>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com> > wrote: > >>>>>> > >>>>>>> Preventing double-triggering by separating DAG files different > >>>>> schedulers > >>>>>>> parse sounds easier and more intuitive. I actually removed one of > the > >>>>>>> double-triggering prevention logic here > >>>>>>> < > >>>>>>> > >>>>> > >>> > https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127 > >>>>>>>> (expensive) > >>>>>>> and > >>>>>>> was relying on this lock > >>>>>>> < > >>>>>>> > >>>>> > >>> > https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233 > >>>>>>>> > >>>>>>> to > >>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw > the > >>>>>>> insert can be insert overwrite to be idempotent). > >>>>>>> > >>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see > >>> double-firing > >>>>>>> recently. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Kevin Y > >>>>>>> > >>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin < > >>>>>>> maximebeauche...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Forgot to mention: the intention was to use the lock, but I never > >>>>>>>> personally got to do the second phase which would consist of > skipping > >>>>> the > >>>>>>>> DAG if the lock is on, and expire the lock eventually based on a > >>> config > >>>>>>>> setting. > >>>>>>>> > >>>>>>>> Max > >>>>>>>> > >>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin < > >>>>>>>> maximebeauche...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> My original intention with the lock was preventing > >>> "double-triggering" > >>>>>>> of > >>>>>>>>> task (triggering refers to the scheduler putting the message in > the > >>>>>>>> queue). > >>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing > >>>>>>> happens > >>>>>>>>> when the worker receives the message and starts the task), even > if > >>> the > >>>>>>>>> scheduler was to go rogue or restart and send multiple triggers > for > >>> a > >>>>>>>> task > >>>>>>>>> instance, the worker(s) should only start one task instance. > That's > >>>>>>> done > >>>>>>>> by > >>>>>>>>> running the database assertions behind the conditions being met > as > >>>>> read > >>>>>>>>> database transaction (no task can alter the rows that validate > the > >>>>>>>>> assertion while it's getting asserted). In practice it's a little > >>>>>>> tricky > >>>>>>>>> and we've seen rogue double-firing in the past (I have no idea > how > >>>>>>> often > >>>>>>>>> that happens). > >>>>>>>>> > >>>>>>>>> If we do want to prevent double-triggerring, we should make sure > >>> that > >>>>> 2 > >>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same > >>> time. > >>>>>>>> That > >>>>>>>>> would mean for the scheduler to not start the process of locked > >>> DAGs, > >>>>>>> and > >>>>>>>>> by providing a mechanism to expire the locks after some time. > >>>>>>>>> > >>>>>>>>> Has anyone experienced double firing lately? If that exist we > should > >>>>>>> fix > >>>>>>>>> it, but also be careful around multiple scheduler > double-triggering > >>> as > >>>>>>> it > >>>>>>>>> would make that problem potentially much worse. > >>>>>>>>> > >>>>>>>>> Max > >>>>>>>>> > >>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong < > xd.den...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier > >>> last > >>>>>>>> year > >>>>>>>>>> ( > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > >>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > >>>>>>>>> > >>>>>>>>>> ) > >>>>>>>>>> > >>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and > now > >>>>> I’m > >>>>>>>>>> thinking how we can have it more proper & robust. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> XD > >>>>>>>>>> > >>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo < > >>> mario.urqu...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> We have been running multiple schedulers for about 3 months. > We > >>>>>>>> created > >>>>>>>>>>> multiple services to run airflow schedulers. The only > difference > >>> is > >>>>>>>>>> that > >>>>>>>>>>> we have each of the schedulers pointed to a directory one level > >>>>>>> deeper > >>>>>>>>>> than > >>>>>>>>>>> the DAG home directory that the workers and webapp use. We have > >>> seen > >>>>>>>>>> much > >>>>>>>>>>> better scheduling performance but this does not yet help with > HA. > >>>>>>>>>>> > >>>>>>>>>>> DAGS_HOME: > >>>>>>>>>>> {airflow_home}/dags (webapp & workers) > >>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1) > >>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2) > >>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3) > >>>>>>>>>>> > >>>>>>>>>>> Not sure if this helps, just sharing in case it does. > >>>>>>>>>>> > >>>>>>>>>>> Thank you, > >>>>>>>>>>> Mario > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin < > bdbr...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I have done quite some work on making it possible to run > multiple > >>>>>>>>>>>> schedulers at the same time. At the moment I don’t think > there > >>> are > >>>>>>>>>> real > >>>>>>>>>>>> blockers actually to do so. We just don’t actively test it. > >>>>>>>>>>>> > >>>>>>>>>>>> Database locking is mostly in place (DagRuns and > TaskInstances). > >>>>>>> And > >>>>>>>> I > >>>>>>>>>>>> think the worst that can happen is that a task is scheduled > >>> twice. > >>>>>>>> The > >>>>>>>>>> task > >>>>>>>>>>>> will detect this most of the time and kill one off if > concurrent > >>> if > >>>>>>>> not > >>>>>>>>>>>> sequential then I will run again in some occasions. Everyone > is > >>>>>>>> having > >>>>>>>>>>>> idempotent tasks right so no harm done? ;-) > >>>>>>>>>>>> > >>>>>>>>>>>> Have you encountered issues? Maybe work those out? > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers > >>>>>>>>>>>> Bolke. > >>>>>>>>>>>> > >>>>>>>>>>>> Verstuurd vanaf mijn iPad > >>>>>>>>>>>> > >>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong < > >>> xd.den...@gmail.com> > >>>>>>>> het > >>>>>>>>>>>> volgende geschreven: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Max, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Following > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > >>>>>>>>>>>> < > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > >>>>>>>>>>> , > >>>>>>>>>>>> I’m trying to prepare an AIP for supporting > multiple-scheduler in > >>>>>>>>>> Airflow > >>>>>>>>>>>> (mainly for HA and Higher scheduling performance). > >>>>>>>>>>>>> > >>>>>>>>>>>>> Along the process of code checking, I found that there is one > >>>>>>>>>> attribute > >>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current > >>>>>>>>>>>> implementation, but it was introduced long time back (2015) to > >>>>>>> allow > >>>>>>>>>>>> multiple schedulers to work together ( > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 > >>>>>>>>>>>> < > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 > >>>>>>>>>>> > >>>>>>>>>>>> ). > >>>>>>>>>>>>> > >>>>>>>>>>>>> Since you were the original author of it, it would be very > >>> helpful > >>>>>>>> if > >>>>>>>>>>>> you can kindly share why the multiple-schedulers > implementation > >>> was > >>>>>>>>>> removed > >>>>>>>>>>>> eventually, and what challenges/complexity there were. > >>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier > >>>>>>> discussion > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E > >>>>>>>>>>>> < > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>> > https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E > >>>>>>>>>>> > >>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG > >>>>>>>>>> prioritisation & > >>>>>>>>>>>> load on DB. Other than these, anything else you would like to > >>>>>>>> advise?) > >>>>>>>>>>>>> > >>>>>>>>>>>>> I will also dive into the git history further to understand > it > >>>>>>>> better. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> XD > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>>> > >>>> -- > >>>> > >>>> Jarek Potiuk > >>>> Polidea <https://www.polidea.com/> | Principal Software Engineer > >>>> > >>>> M: +48 660 796 129 <+48660796129> > >>>> E: jarek.pot...@polidea.com > >>> > >>> > > > >