Hi, My proposal is focusing mainly on scalability and indeed not so much on HA. This mainly because that is also the main issue from the original author. Have a form of HA on this MainScheduler would still be nice to have.
The problem with is that have a fixed number of scheduler does not scale on the load. On my current client they try to execute 5000+ DAG’s at the same time. A single scheduler cycle to touch all DAG’s takes 2-3 hour. So to do this within 5 min 36 of those schedulers with locking should be there at all time. After 2 hours 2 schedulers would be enough, this means in this situation 34 scheduler processes are wasted and only producing overhead. This DagScheduler is a short living task, so this is not a persistent worker process. The MainScheduler should resubmit when it is required. Gr, Peter > On 18 Mar 2019, at 05:32, Maxime Beauchemin <maximebeauche...@gmail.com> > wrote: > > The proposal reads "Looking at the original AIP-15 the author proposes to > use locking to enable the use of multiple schedulers, this might introduce > unnecessary complexity" > > To me introducing multiple roles (master scheduler + scheduler minions), > may be actually more complex than just having "shared nothing" schedulers > with locking. The former is also less scalable (whatever workload is done > on that master [say serialization] can hit scale issues) and is less HA (as > it relies on the orchestrator [k8s] for HA). > > My personal incline has always been going towards renaming the scheduler to > "supervisor" (as it already does significantly more than just triggering > tasks) and allowing many instances of that role, and using locks where > necessary. That way there are just 2 roles in the cluster: supervisor and > worker processes. Depending on the executor (say for k8s) you don't even > need actual persistent worker processes. > > Max > > On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof <pjrvant...@gmail.com> > wrote: > >> Hi all, >> >> I think that scheduling locking is maybe not the best way in solving this >> issue. Still I’m in support of taking a good look at the scheduler because >> it has some real scaling issues. >> >> I did wrote an alternative proposal to solve the scalability of the >> scheduler: >> >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler >> < >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler >>> >> >> Any input on this is welcome. >> >> Gr, >> Peter >> >> >>> On 3 Mar 2019, at 03:26, Deng Xiaodong <xd.den...@gmail.com> wrote: >>> >>> Thanks Max. >>> >>> I have documented all the discussions around this topic & useful inputs >> into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling >> Performance) >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651 >> < >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>. >> >>> >>> More inputs from folks are welcomed. >>> >>> Thanks. >>> >>> >>> XD >>> >>>> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin < >> maximebeauche...@gmail.com> wrote: >>>> >>>> Personally I'd vote against the idea of having certain scheduler >> handling a >>>> subset of the DAGs, that's just not HA. >>>> >>>> Also if you are in an env where you have a small number of large DAGs, >> the >>>> odds of having wasted work and double-firing get pretty high. >>>> >>>> With the lock in place, it's just a matter of the scheduler loop to >> select >>>> (in a db transaction) the dag that's not been processed for the longest >>>> time that is not locked. Flipping the lock flag to true should be part >> of >>>> the db transaction. We probably need a btree index on lock and last >>>> processed time. >>>> >>>> This way adding scheduler processes increases the scheduling pace, and >>>> provides an HA solution. No leader / master / slave or election process, >>>> just equal workers that work together. >>>> >>>> Max >>>> >>>> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com> >> wrote: >>>> >>>>> Get your point and agree. And the suggestion you gave lastly to random >>>>> sort DAGs is a great idea to address it. Thanks! >>>>> >>>>> XD >>>>> >>>>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com> >>>>> wrote: >>>>>> >>>>>> I think that the probability calculation holds only if there is no >>>>>> correlation between different schedulers. I think however there might >> be >>>>> an >>>>>> accidental correlation if you think about typical deployments. >>>>>> >>>>>> Some details why I think accidental correlation is possible and even >>>>>> likely. Assume that: >>>>>> >>>>>> - we have similar and similarly busy machines running schedulers >>>>> (likely) >>>>>> - time is synchronised between the machines (likely) >>>>>> - the machines have the same DAG folders mounted (or copied) and the >>>>>> same filesystem is used (this is exactly what multiple schedulers >>>>>> deployment is all about) >>>>>> - the schedulers start scanning at exactly the same time (crossing >> 0:00 >>>>>> second every full five minutes for example) - this I am not sure but >> I >>>>>> imagine this might be "typical" behaviour. >>>>>> - they process list of DAGs in exactly the same sequence (it looks >> like >>>>>> this is the case dag_processing >>>>>> < >>>>> >> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300 >>>>>> >>>>>> and models/__init__ >>>>>> < >>>>> >> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567 >>>>>> : >>>>>> we use os.walk which uses os.listdir for which sequence of processing >>>>>> depends on the filesystem implementation >>>>>> < >>>>> >> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic> >>>>>> and >>>>>> then we append files to the list) >>>>>> >>>>>> Then it's rather likely that the schedulers will be competing about >> the >>>>>> very same DAGs at the very beginning. Locking will change how quickly >>>>> they >>>>>> process each DAG of course, but If the DAGs are of similar sizes it's >>>>> also >>>>>> likely that the speed of scanning (DAGS/s) is similar for all >> schedulers. >>>>>> The schedulers will then catch-up with each other and might pretty >> much >>>>>> continuously compete for the same DAGs almost all the time. >>>>>> >>>>>> It can be mitigated super-easily by random sorting of the DAGs folder >>>>> list >>>>>> after it is prepared (it's file-system dependent now so we do not >> rely on >>>>>> particular order) . Then the probability numbers will hold perfectly I >>>>>> think :) >>>>>> >>>>>> J. >>>>>> >>>>>> >>>>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com> >>>>> wrote: >>>>>> >>>>>>> I’m thinking of which architecture would be ideal. >>>>>>> >>>>>>> >>>>>>> # Option-1: >>>>>>> The master-slave architecture would be one option. But >> leader-selection >>>>>>> will be very essential to consider, otherwise we have issue in terms >> of >>>>> HA >>>>>>> again. >>>>>>> >>>>>>> >>>>>>> # Option-2: >>>>>>> Another option we may consider is to simply start multiple scheduler >>>>>>> instances (just using the current implementation, after modify & >>>>> validate >>>>>>> the scheduler_lock on DagModel). >>>>>>> >>>>>>> - In this case, given we handle everything properly using locking, we >>>>>>> don’t need to worry too much about double-scheduling/triggering. >>>>>>> >>>>>>> - Another potential concern I had earlier is that different >> schedulers >>>>> may >>>>>>> compete with each other and cause “waste” of scheduler resource. >>>>>>> After further thinking, I realise this is a typical Birthday Problem. >>>>>>> Given we have m DAGs, and n schedulers, at any moment, the >> probability >>>>>>> that all schedulers are working on different DAGs is m!/((m-n)! * >>>>> (m^n)), >>>>>>> and the probability that there are schedulers competing on the same >> DAG >>>>>>> will be 1-m!/((m-n)! * (m^n)). >>>>>>> >>>>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, >> the >>>>>>> probability that there is schedulers competing on the same DAG is >> only >>>>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is >> only >>>>>>> 0.33%. >>>>>>> (This probability will be higher if m/n is low. But users should not >>>>> start >>>>>>> too many schedulers if they don’t have that many DAGs). >>>>>>> >>>>>>> Given the probability of schedulers competing is so low, my concern >> on >>>>>>> scheduler resource waste is not really valid. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Based on these calculations/assessment, I think we can go for >> option-2, >>>>>>> i.e. we don’t make big change in the current implementation. >> Instead, we >>>>>>> ensure the scheduler_lock is working well and test intensively on >>>>> running >>>>>>> multiple schedulers. Then we should be good to let users know that >> it’s >>>>>>> safe to run multiple schedulers. >>>>>>> >>>>>>> Please share your thoughts on this and correct me if I’m wrong in any >>>>>>> point above. Thanks. >>>>>>> >>>>>>> >>>>>>> XD >>>>>>> >>>>>>> >>>>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem < >>>>>>> https://en.wikipedia.org/wiki/Birthday_problem> >>>>>>> >>>>>>> >>>>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote: >>>>>>>> >>>>>>>> Does the proposal use master-slave architecture(leader scheduler vs >>>>> slave >>>>>>>> scheduler)? >>>>>>>> >>>>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com> >> wrote: >>>>>>>> >>>>>>>>> Preventing double-triggering by separating DAG files different >>>>>>> schedulers >>>>>>>>> parse sounds easier and more intuitive. I actually removed one of >> the >>>>>>>>> double-triggering prevention logic here >>>>>>>>> < >>>>>>>>> >>>>>>> >>>>> >> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127 >>>>>>>>>> (expensive) >>>>>>>>> and >>>>>>>>> was relying on this lock >>>>>>>>> < >>>>>>>>> >>>>>>> >>>>> >> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233 >>>>>>>>>> >>>>>>>>> to >>>>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw >> the >>>>>>>>> insert can be insert overwrite to be idempotent). >>>>>>>>> >>>>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see >>>>> double-firing >>>>>>>>> recently. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Kevin Y >>>>>>>>> >>>>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin < >>>>>>>>> maximebeauche...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Forgot to mention: the intention was to use the lock, but I never >>>>>>>>>> personally got to do the second phase which would consist of >> skipping >>>>>>> the >>>>>>>>>> DAG if the lock is on, and expire the lock eventually based on a >>>>> config >>>>>>>>>> setting. >>>>>>>>>> >>>>>>>>>> Max >>>>>>>>>> >>>>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin < >>>>>>>>>> maximebeauche...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> My original intention with the lock was preventing >>>>> "double-triggering" >>>>>>>>> of >>>>>>>>>>> task (triggering refers to the scheduler putting the message in >> the >>>>>>>>>> queue). >>>>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing >>>>>>>>> happens >>>>>>>>>>> when the worker receives the message and starts the task), even >> if >>>>> the >>>>>>>>>>> scheduler was to go rogue or restart and send multiple triggers >> for >>>>> a >>>>>>>>>> task >>>>>>>>>>> instance, the worker(s) should only start one task instance. >> That's >>>>>>>>> done >>>>>>>>>> by >>>>>>>>>>> running the database assertions behind the conditions being met >> as >>>>>>> read >>>>>>>>>>> database transaction (no task can alter the rows that validate >> the >>>>>>>>>>> assertion while it's getting asserted). In practice it's a little >>>>>>>>> tricky >>>>>>>>>>> and we've seen rogue double-firing in the past (I have no idea >> how >>>>>>>>> often >>>>>>>>>>> that happens). >>>>>>>>>>> >>>>>>>>>>> If we do want to prevent double-triggerring, we should make sure >>>>> that >>>>>>> 2 >>>>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same >>>>> time. >>>>>>>>>> That >>>>>>>>>>> would mean for the scheduler to not start the process of locked >>>>> DAGs, >>>>>>>>> and >>>>>>>>>>> by providing a mechanism to expire the locks after some time. >>>>>>>>>>> >>>>>>>>>>> Has anyone experienced double firing lately? If that exist we >> should >>>>>>>>> fix >>>>>>>>>>> it, but also be careful around multiple scheduler >> double-triggering >>>>> as >>>>>>>>> it >>>>>>>>>>> would make that problem potentially much worse. >>>>>>>>>>> >>>>>>>>>>> Max >>>>>>>>>>> >>>>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong < >> xd.den...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier >>>>> last >>>>>>>>>> year >>>>>>>>>>>> ( >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>>>> < >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>>> >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and >> now >>>>>>> I’m >>>>>>>>>>>> thinking how we can have it more proper & robust. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> XD >>>>>>>>>>>> >>>>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo < >>>>> mario.urqu...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> We have been running multiple schedulers for about 3 months. >> We >>>>>>>>>> created >>>>>>>>>>>>> multiple services to run airflow schedulers. The only >> difference >>>>> is >>>>>>>>>>>> that >>>>>>>>>>>>> we have each of the schedulers pointed to a directory one level >>>>>>>>> deeper >>>>>>>>>>>> than >>>>>>>>>>>>> the DAG home directory that the workers and webapp use. We have >>>>> seen >>>>>>>>>>>> much >>>>>>>>>>>>> better scheduling performance but this does not yet help with >> HA. >>>>>>>>>>>>> >>>>>>>>>>>>> DAGS_HOME: >>>>>>>>>>>>> {airflow_home}/dags (webapp & workers) >>>>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1) >>>>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2) >>>>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3) >>>>>>>>>>>>> >>>>>>>>>>>>> Not sure if this helps, just sharing in case it does. >>>>>>>>>>>>> >>>>>>>>>>>>> Thank you, >>>>>>>>>>>>> Mario >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin < >> bdbr...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I have done quite some work on making it possible to run >> multiple >>>>>>>>>>>>>> schedulers at the same time. At the moment I don’t think >> there >>>>> are >>>>>>>>>>>> real >>>>>>>>>>>>>> blockers actually to do so. We just don’t actively test it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Database locking is mostly in place (DagRuns and >> TaskInstances). >>>>>>>>> And >>>>>>>>>> I >>>>>>>>>>>>>> think the worst that can happen is that a task is scheduled >>>>> twice. >>>>>>>>>> The >>>>>>>>>>>> task >>>>>>>>>>>>>> will detect this most of the time and kill one off if >> concurrent >>>>> if >>>>>>>>>> not >>>>>>>>>>>>>> sequential then I will run again in some occasions. Everyone >> is >>>>>>>>>> having >>>>>>>>>>>>>> idempotent tasks right so no harm done? ;-) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Have you encountered issues? Maybe work those out? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers >>>>>>>>>>>>>> Bolke. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Verstuurd vanaf mijn iPad >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong < >>>>> xd.den...@gmail.com> >>>>>>>>>> het >>>>>>>>>>>>>> volgende geschreven: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Max, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Following >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>>>>>> < >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>>>>> , >>>>>>>>>>>>>> I’m trying to prepare an AIP for supporting >> multiple-scheduler in >>>>>>>>>>>> Airflow >>>>>>>>>>>>>> (mainly for HA and Higher scheduling performance). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Along the process of code checking, I found that there is one >>>>>>>>>>>> attribute >>>>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current >>>>>>>>>>>>>> implementation, but it was introduced long time back (2015) to >>>>>>>>> allow >>>>>>>>>>>>>> multiple schedulers to work together ( >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 >>>>>>>>>>>>>> < >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 >>>>>>>>>>>>> >>>>>>>>>>>>>> ). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Since you were the original author of it, it would be very >>>>> helpful >>>>>>>>>> if >>>>>>>>>>>>>> you can kindly share why the multiple-schedulers >> implementation >>>>> was >>>>>>>>>>>> removed >>>>>>>>>>>>>> eventually, and what challenges/complexity there were. >>>>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier >>>>>>>>> discussion >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E >>>>>>>>>>>>>> < >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E >>>>>>>>>>>>> >>>>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG >>>>>>>>>>>> prioritisation & >>>>>>>>>>>>>> load on DB. Other than these, anything else you would like to >>>>>>>>>> advise?) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I will also dive into the git history further to understand >> it >>>>>>>>>> better. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> XD >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Jarek Potiuk >>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer >>>>>> >>>>>> M: +48 660 796 129 <+48660796129> >>>>>> E: jarek.pot...@polidea.com >>>>> >>>>> >>> >> >>