Hi all, I think that scheduling locking is maybe not the best way in solving this issue. Still I’m in support of taking a good look at the scheduler because it has some real scaling issues.
I did wrote an alternative proposal to solve the scalability of the scheduler: https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler> Any input on this is welcome. Gr, Peter > On 3 Mar 2019, at 03:26, Deng Xiaodong <xd.den...@gmail.com> wrote: > > Thanks Max. > > I have documented all the discussions around this topic & useful inputs into > AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling Performance) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651 > <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>. > > More inputs from folks are welcomed. > > Thanks. > > > XD > >> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <maximebeauche...@gmail.com> >> wrote: >> >> Personally I'd vote against the idea of having certain scheduler handling a >> subset of the DAGs, that's just not HA. >> >> Also if you are in an env where you have a small number of large DAGs, the >> odds of having wasted work and double-firing get pretty high. >> >> With the lock in place, it's just a matter of the scheduler loop to select >> (in a db transaction) the dag that's not been processed for the longest >> time that is not locked. Flipping the lock flag to true should be part of >> the db transaction. We probably need a btree index on lock and last >> processed time. >> >> This way adding scheduler processes increases the scheduling pace, and >> provides an HA solution. No leader / master / slave or election process, >> just equal workers that work together. >> >> Max >> >> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com> wrote: >> >>> Get your point and agree. And the suggestion you gave lastly to random >>> sort DAGs is a great idea to address it. Thanks! >>> >>> XD >>> >>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com> >>> wrote: >>>> >>>> I think that the probability calculation holds only if there is no >>>> correlation between different schedulers. I think however there might be >>> an >>>> accidental correlation if you think about typical deployments. >>>> >>>> Some details why I think accidental correlation is possible and even >>>> likely. Assume that: >>>> >>>> - we have similar and similarly busy machines running schedulers >>> (likely) >>>> - time is synchronised between the machines (likely) >>>> - the machines have the same DAG folders mounted (or copied) and the >>>> same filesystem is used (this is exactly what multiple schedulers >>>> deployment is all about) >>>> - the schedulers start scanning at exactly the same time (crossing 0:00 >>>> second every full five minutes for example) - this I am not sure but I >>>> imagine this might be "typical" behaviour. >>>> - they process list of DAGs in exactly the same sequence (it looks like >>>> this is the case dag_processing >>>> < >>> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300 >>>> >>>> and models/__init__ >>>> < >>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567 >>>> : >>>> we use os.walk which uses os.listdir for which sequence of processing >>>> depends on the filesystem implementation >>>> < >>> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic> >>>> and >>>> then we append files to the list) >>>> >>>> Then it's rather likely that the schedulers will be competing about the >>>> very same DAGs at the very beginning. Locking will change how quickly >>> they >>>> process each DAG of course, but If the DAGs are of similar sizes it's >>> also >>>> likely that the speed of scanning (DAGS/s) is similar for all schedulers. >>>> The schedulers will then catch-up with each other and might pretty much >>>> continuously compete for the same DAGs almost all the time. >>>> >>>> It can be mitigated super-easily by random sorting of the DAGs folder >>> list >>>> after it is prepared (it's file-system dependent now so we do not rely on >>>> particular order) . Then the probability numbers will hold perfectly I >>>> think :) >>>> >>>> J. >>>> >>>> >>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com> >>> wrote: >>>> >>>>> I’m thinking of which architecture would be ideal. >>>>> >>>>> >>>>> # Option-1: >>>>> The master-slave architecture would be one option. But leader-selection >>>>> will be very essential to consider, otherwise we have issue in terms of >>> HA >>>>> again. >>>>> >>>>> >>>>> # Option-2: >>>>> Another option we may consider is to simply start multiple scheduler >>>>> instances (just using the current implementation, after modify & >>> validate >>>>> the scheduler_lock on DagModel). >>>>> >>>>> - In this case, given we handle everything properly using locking, we >>>>> don’t need to worry too much about double-scheduling/triggering. >>>>> >>>>> - Another potential concern I had earlier is that different schedulers >>> may >>>>> compete with each other and cause “waste” of scheduler resource. >>>>> After further thinking, I realise this is a typical Birthday Problem. >>>>> Given we have m DAGs, and n schedulers, at any moment, the probability >>>>> that all schedulers are working on different DAGs is m!/((m-n)! * >>> (m^n)), >>>>> and the probability that there are schedulers competing on the same DAG >>>>> will be 1-m!/((m-n)! * (m^n)). >>>>> >>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the >>>>> probability that there is schedulers competing on the same DAG is only >>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only >>>>> 0.33%. >>>>> (This probability will be higher if m/n is low. But users should not >>> start >>>>> too many schedulers if they don’t have that many DAGs). >>>>> >>>>> Given the probability of schedulers competing is so low, my concern on >>>>> scheduler resource waste is not really valid. >>>>> >>>>> >>>>> >>>>> Based on these calculations/assessment, I think we can go for option-2, >>>>> i.e. we don’t make big change in the current implementation. Instead, we >>>>> ensure the scheduler_lock is working well and test intensively on >>> running >>>>> multiple schedulers. Then we should be good to let users know that it’s >>>>> safe to run multiple schedulers. >>>>> >>>>> Please share your thoughts on this and correct me if I’m wrong in any >>>>> point above. Thanks. >>>>> >>>>> >>>>> XD >>>>> >>>>> >>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem < >>>>> https://en.wikipedia.org/wiki/Birthday_problem> >>>>> >>>>> >>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote: >>>>>> >>>>>> Does the proposal use master-slave architecture(leader scheduler vs >>> slave >>>>>> scheduler)? >>>>>> >>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com> wrote: >>>>>> >>>>>>> Preventing double-triggering by separating DAG files different >>>>> schedulers >>>>>>> parse sounds easier and more intuitive. I actually removed one of the >>>>>>> double-triggering prevention logic here >>>>>>> < >>>>>>> >>>>> >>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127 >>>>>>>> (expensive) >>>>>>> and >>>>>>> was relying on this lock >>>>>>> < >>>>>>> >>>>> >>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233 >>>>>>>> >>>>>>> to >>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the >>>>>>> insert can be insert overwrite to be idempotent). >>>>>>> >>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see >>> double-firing >>>>>>> recently. >>>>>>> >>>>>>> Cheers, >>>>>>> Kevin Y >>>>>>> >>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin < >>>>>>> maximebeauche...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Forgot to mention: the intention was to use the lock, but I never >>>>>>>> personally got to do the second phase which would consist of skipping >>>>> the >>>>>>>> DAG if the lock is on, and expire the lock eventually based on a >>> config >>>>>>>> setting. >>>>>>>> >>>>>>>> Max >>>>>>>> >>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin < >>>>>>>> maximebeauche...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> My original intention with the lock was preventing >>> "double-triggering" >>>>>>> of >>>>>>>>> task (triggering refers to the scheduler putting the message in the >>>>>>>> queue). >>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing >>>>>>> happens >>>>>>>>> when the worker receives the message and starts the task), even if >>> the >>>>>>>>> scheduler was to go rogue or restart and send multiple triggers for >>> a >>>>>>>> task >>>>>>>>> instance, the worker(s) should only start one task instance. That's >>>>>>> done >>>>>>>> by >>>>>>>>> running the database assertions behind the conditions being met as >>>>> read >>>>>>>>> database transaction (no task can alter the rows that validate the >>>>>>>>> assertion while it's getting asserted). In practice it's a little >>>>>>> tricky >>>>>>>>> and we've seen rogue double-firing in the past (I have no idea how >>>>>>> often >>>>>>>>> that happens). >>>>>>>>> >>>>>>>>> If we do want to prevent double-triggerring, we should make sure >>> that >>>>> 2 >>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same >>> time. >>>>>>>> That >>>>>>>>> would mean for the scheduler to not start the process of locked >>> DAGs, >>>>>>> and >>>>>>>>> by providing a mechanism to expire the locks after some time. >>>>>>>>> >>>>>>>>> Has anyone experienced double firing lately? If that exist we should >>>>>>> fix >>>>>>>>> it, but also be careful around multiple scheduler double-triggering >>> as >>>>>>> it >>>>>>>>> would make that problem potentially much worse. >>>>>>>>> >>>>>>>>> Max >>>>>>>>> >>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd.den...@gmail.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier >>> last >>>>>>>> year >>>>>>>>>> ( >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>> < >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>> >>>>>>>>>> ) >>>>>>>>>> >>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now >>>>> I’m >>>>>>>>>> thinking how we can have it more proper & robust. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> XD >>>>>>>>>> >>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo < >>> mario.urqu...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> We have been running multiple schedulers for about 3 months. We >>>>>>>> created >>>>>>>>>>> multiple services to run airflow schedulers. The only difference >>> is >>>>>>>>>> that >>>>>>>>>>> we have each of the schedulers pointed to a directory one level >>>>>>> deeper >>>>>>>>>> than >>>>>>>>>>> the DAG home directory that the workers and webapp use. We have >>> seen >>>>>>>>>> much >>>>>>>>>>> better scheduling performance but this does not yet help with HA. >>>>>>>>>>> >>>>>>>>>>> DAGS_HOME: >>>>>>>>>>> {airflow_home}/dags (webapp & workers) >>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1) >>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2) >>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3) >>>>>>>>>>> >>>>>>>>>>> Not sure if this helps, just sharing in case it does. >>>>>>>>>>> >>>>>>>>>>> Thank you, >>>>>>>>>>> Mario >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bdbr...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I have done quite some work on making it possible to run multiple >>>>>>>>>>>> schedulers at the same time. At the moment I don’t think there >>> are >>>>>>>>>> real >>>>>>>>>>>> blockers actually to do so. We just don’t actively test it. >>>>>>>>>>>> >>>>>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances). >>>>>>> And >>>>>>>> I >>>>>>>>>>>> think the worst that can happen is that a task is scheduled >>> twice. >>>>>>>> The >>>>>>>>>> task >>>>>>>>>>>> will detect this most of the time and kill one off if concurrent >>> if >>>>>>>> not >>>>>>>>>>>> sequential then I will run again in some occasions. Everyone is >>>>>>>> having >>>>>>>>>>>> idempotent tasks right so no harm done? ;-) >>>>>>>>>>>> >>>>>>>>>>>> Have you encountered issues? Maybe work those out? >>>>>>>>>>>> >>>>>>>>>>>> Cheers >>>>>>>>>>>> Bolke. >>>>>>>>>>>> >>>>>>>>>>>> Verstuurd vanaf mijn iPad >>>>>>>>>>>> >>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong < >>> xd.den...@gmail.com> >>>>>>>> het >>>>>>>>>>>> volgende geschreven: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Max, >>>>>>>>>>>>> >>>>>>>>>>>>> Following >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>>>> < >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E >>>>>>>>>>> , >>>>>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in >>>>>>>>>> Airflow >>>>>>>>>>>> (mainly for HA and Higher scheduling performance). >>>>>>>>>>>>> >>>>>>>>>>>>> Along the process of code checking, I found that there is one >>>>>>>>>> attribute >>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current >>>>>>>>>>>> implementation, but it was introduced long time back (2015) to >>>>>>> allow >>>>>>>>>>>> multiple schedulers to work together ( >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 >>>>>>>>>>>> < >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 >>>>>>>>>>> >>>>>>>>>>>> ). >>>>>>>>>>>>> >>>>>>>>>>>>> Since you were the original author of it, it would be very >>> helpful >>>>>>>> if >>>>>>>>>>>> you can kindly share why the multiple-schedulers implementation >>> was >>>>>>>>>> removed >>>>>>>>>>>> eventually, and what challenges/complexity there were. >>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier >>>>>>> discussion >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E >>>>>>>>>>>> < >>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E >>>>>>>>>>> >>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG >>>>>>>>>> prioritisation & >>>>>>>>>>>> load on DB. Other than these, anything else you would like to >>>>>>>> advise?) >>>>>>>>>>>>> >>>>>>>>>>>>> I will also dive into the git history further to understand it >>>>>>>> better. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> XD >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>>> >>>> -- >>>> >>>> Jarek Potiuk >>>> Polidea <https://www.polidea.com/> | Principal Software Engineer >>>> >>>> M: +48 660 796 129 <+48660796129> >>>> E: jarek.pot...@polidea.com >>> >>> >