Personally I'd vote against the idea of having certain scheduler handling a subset of the DAGs, that's just not HA.
Also if you are in an env where you have a small number of large DAGs, the odds of having wasted work and double-firing get pretty high. With the lock in place, it's just a matter of the scheduler loop to select (in a db transaction) the dag that's not been processed for the longest time that is not locked. Flipping the lock flag to true should be part of the db transaction. We probably need a btree index on lock and last processed time. This way adding scheduler processes increases the scheduling pace, and provides an HA solution. No leader / master / slave or election process, just equal workers that work together. Max On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com> wrote: > Get your point and agree. And the suggestion you gave lastly to random > sort DAGs is a great idea to address it. Thanks! > > XD > > > On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com> > wrote: > > > > I think that the probability calculation holds only if there is no > > correlation between different schedulers. I think however there might be > an > > accidental correlation if you think about typical deployments. > > > > Some details why I think accidental correlation is possible and even > > likely. Assume that: > > > > - we have similar and similarly busy machines running schedulers > (likely) > > - time is synchronised between the machines (likely) > > - the machines have the same DAG folders mounted (or copied) and the > > same filesystem is used (this is exactly what multiple schedulers > > deployment is all about) > > - the schedulers start scanning at exactly the same time (crossing 0:00 > > second every full five minutes for example) - this I am not sure but I > > imagine this might be "typical" behaviour. > > - they process list of DAGs in exactly the same sequence (it looks like > > this is the case dag_processing > > < > https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300 > > > > and models/__init__ > > < > https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567 > >: > > we use os.walk which uses os.listdir for which sequence of processing > > depends on the filesystem implementation > > < > https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic> > > and > > then we append files to the list) > > > > Then it's rather likely that the schedulers will be competing about the > > very same DAGs at the very beginning. Locking will change how quickly > they > > process each DAG of course, but If the DAGs are of similar sizes it's > also > > likely that the speed of scanning (DAGS/s) is similar for all schedulers. > > The schedulers will then catch-up with each other and might pretty much > > continuously compete for the same DAGs almost all the time. > > > > It can be mitigated super-easily by random sorting of the DAGs folder > list > > after it is prepared (it's file-system dependent now so we do not rely on > > particular order) . Then the probability numbers will hold perfectly I > > think :) > > > > J. > > > > > > On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com> > wrote: > > > >> I’m thinking of which architecture would be ideal. > >> > >> > >> # Option-1: > >> The master-slave architecture would be one option. But leader-selection > >> will be very essential to consider, otherwise we have issue in terms of > HA > >> again. > >> > >> > >> # Option-2: > >> Another option we may consider is to simply start multiple scheduler > >> instances (just using the current implementation, after modify & > validate > >> the scheduler_lock on DagModel). > >> > >> - In this case, given we handle everything properly using locking, we > >> don’t need to worry too much about double-scheduling/triggering. > >> > >> - Another potential concern I had earlier is that different schedulers > may > >> compete with each other and cause “waste” of scheduler resource. > >> After further thinking, I realise this is a typical Birthday Problem. > >> Given we have m DAGs, and n schedulers, at any moment, the probability > >> that all schedulers are working on different DAGs is m!/((m-n)! * > (m^n)), > >> and the probability that there are schedulers competing on the same DAG > >> will be 1-m!/((m-n)! * (m^n)). > >> > >> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the > >> probability that there is schedulers competing on the same DAG is only > >> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only > >> 0.33%. > >> (This probability will be higher if m/n is low. But users should not > start > >> too many schedulers if they don’t have that many DAGs). > >> > >> Given the probability of schedulers competing is so low, my concern on > >> scheduler resource waste is not really valid. > >> > >> > >> > >> Based on these calculations/assessment, I think we can go for option-2, > >> i.e. we don’t make big change in the current implementation. Instead, we > >> ensure the scheduler_lock is working well and test intensively on > running > >> multiple schedulers. Then we should be good to let users know that it’s > >> safe to run multiple schedulers. > >> > >> Please share your thoughts on this and correct me if I’m wrong in any > >> point above. Thanks. > >> > >> > >> XD > >> > >> > >> Reference: https://en.wikipedia.org/wiki/Birthday_problem < > >> https://en.wikipedia.org/wiki/Birthday_problem> > >> > >> > >>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote: > >>> > >>> Does the proposal use master-slave architecture(leader scheduler vs > slave > >>> scheduler)? > >>> > >>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com> wrote: > >>> > >>>> Preventing double-triggering by separating DAG files different > >> schedulers > >>>> parse sounds easier and more intuitive. I actually removed one of the > >>>> double-triggering prevention logic here > >>>> < > >>>> > >> > https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127 > >>>>> (expensive) > >>>> and > >>>> was relying on this lock > >>>> < > >>>> > >> > https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233 > >>>>> > >>>> to > >>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the > >>>> insert can be insert overwrite to be idempotent). > >>>> > >>>> Also tho in Airbnb we requeue tasks a lot, we haven't see > double-firing > >>>> recently. > >>>> > >>>> Cheers, > >>>> Kevin Y > >>>> > >>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin < > >>>> maximebeauche...@gmail.com> > >>>> wrote: > >>>> > >>>>> Forgot to mention: the intention was to use the lock, but I never > >>>>> personally got to do the second phase which would consist of skipping > >> the > >>>>> DAG if the lock is on, and expire the lock eventually based on a > config > >>>>> setting. > >>>>> > >>>>> Max > >>>>> > >>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin < > >>>>> maximebeauche...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> My original intention with the lock was preventing > "double-triggering" > >>>> of > >>>>>> task (triggering refers to the scheduler putting the message in the > >>>>> queue). > >>>>>> Airflow now has good "double-firing-prevention" of tasks (firing > >>>> happens > >>>>>> when the worker receives the message and starts the task), even if > the > >>>>>> scheduler was to go rogue or restart and send multiple triggers for > a > >>>>> task > >>>>>> instance, the worker(s) should only start one task instance. That's > >>>> done > >>>>> by > >>>>>> running the database assertions behind the conditions being met as > >> read > >>>>>> database transaction (no task can alter the rows that validate the > >>>>>> assertion while it's getting asserted). In practice it's a little > >>>> tricky > >>>>>> and we've seen rogue double-firing in the past (I have no idea how > >>>> often > >>>>>> that happens). > >>>>>> > >>>>>> If we do want to prevent double-triggerring, we should make sure > that > >> 2 > >>>>>> schedulers aren't processing the same DAG or DagRun at the same > time. > >>>>> That > >>>>>> would mean for the scheduler to not start the process of locked > DAGs, > >>>> and > >>>>>> by providing a mechanism to expire the locks after some time. > >>>>>> > >>>>>> Has anyone experienced double firing lately? If that exist we should > >>>> fix > >>>>>> it, but also be careful around multiple scheduler double-triggering > as > >>>> it > >>>>>> would make that problem potentially much worse. > >>>>>> > >>>>>> Max > >>>>>> > >>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd.den...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>>> It’s exactly what my team is doing & what I shared here earlier > last > >>>>> year > >>>>>>> ( > >>>>>>> > >>>>> > >>>> > >> > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > >>>>>>> < > >>>>>>> > >>>>> > >>>> > >> > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > >>>>>> > >>>>>>> ) > >>>>>>> > >>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now > >> I’m > >>>>>>> thinking how we can have it more proper & robust. > >>>>>>> > >>>>>>> > >>>>>>> XD > >>>>>>> > >>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo < > mario.urqu...@gmail.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> We have been running multiple schedulers for about 3 months. We > >>>>> created > >>>>>>>> multiple services to run airflow schedulers. The only difference > is > >>>>>>> that > >>>>>>>> we have each of the schedulers pointed to a directory one level > >>>> deeper > >>>>>>> than > >>>>>>>> the DAG home directory that the workers and webapp use. We have > seen > >>>>>>> much > >>>>>>>> better scheduling performance but this does not yet help with HA. > >>>>>>>> > >>>>>>>> DAGS_HOME: > >>>>>>>> {airflow_home}/dags (webapp & workers) > >>>>>>>> {airflow_home}/dags/group-a/ (scheduler1) > >>>>>>>> {airflow_home}/dags/group-b/ (scheduler2) > >>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3) > >>>>>>>> > >>>>>>>> Not sure if this helps, just sharing in case it does. > >>>>>>>> > >>>>>>>> Thank you, > >>>>>>>> Mario > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bdbr...@gmail.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> I have done quite some work on making it possible to run multiple > >>>>>>>>> schedulers at the same time. At the moment I don’t think there > are > >>>>>>> real > >>>>>>>>> blockers actually to do so. We just don’t actively test it. > >>>>>>>>> > >>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances). > >>>> And > >>>>> I > >>>>>>>>> think the worst that can happen is that a task is scheduled > twice. > >>>>> The > >>>>>>> task > >>>>>>>>> will detect this most of the time and kill one off if concurrent > if > >>>>> not > >>>>>>>>> sequential then I will run again in some occasions. Everyone is > >>>>> having > >>>>>>>>> idempotent tasks right so no harm done? ;-) > >>>>>>>>> > >>>>>>>>> Have you encountered issues? Maybe work those out? > >>>>>>>>> > >>>>>>>>> Cheers > >>>>>>>>> Bolke. > >>>>>>>>> > >>>>>>>>> Verstuurd vanaf mijn iPad > >>>>>>>>> > >>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong < > xd.den...@gmail.com> > >>>>> het > >>>>>>>>> volgende geschreven: > >>>>>>>>>> > >>>>>>>>>> Hi Max, > >>>>>>>>>> > >>>>>>>>>> Following > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > >>>>>>>>> < > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > >>>>>>>> , > >>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in > >>>>>>> Airflow > >>>>>>>>> (mainly for HA and Higher scheduling performance). > >>>>>>>>>> > >>>>>>>>>> Along the process of code checking, I found that there is one > >>>>>>> attribute > >>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current > >>>>>>>>> implementation, but it was introduced long time back (2015) to > >>>> allow > >>>>>>>>> multiple schedulers to work together ( > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 > >>>>>>>>> < > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 > >>>>>>>> > >>>>>>>>> ). > >>>>>>>>>> > >>>>>>>>>> Since you were the original author of it, it would be very > helpful > >>>>> if > >>>>>>>>> you can kindly share why the multiple-schedulers implementation > was > >>>>>>> removed > >>>>>>>>> eventually, and what challenges/complexity there were. > >>>>>>>>>> (You already shared a few valuable inputs in the earlier > >>>> discussion > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E > >>>>>>>>> < > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >> > https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E > >>>>>>>> > >>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG > >>>>>>> prioritisation & > >>>>>>>>> load on DB. Other than these, anything else you would like to > >>>>> advise?) > >>>>>>>>>> > >>>>>>>>>> I will also dive into the git history further to understand it > >>>>> better. > >>>>>>>>>> > >>>>>>>>>> Thanks. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> XD > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>> > >>>> > >> > >> > > > > -- > > > > Jarek Potiuk > > Polidea <https://www.polidea.com/> | Principal Software Engineer > > > > M: +48 660 796 129 <+48660796129> > > E: jarek.pot...@polidea.com > >