Does anything change about your proposal if you do t assume that workers have “quick access” to the DAG files - i.e. what if we are on kube executors and the task spin up time plus git sync time is 30-60s?
(Perhaps this is an extreme case, but we are talking about extreme cases) > On 18 Mar 2019, at 07:58, Bas Harenslak <basharens...@godatadriven.com> wrote: > > Peter, > > The numbers you mention seem to come out of the blue. I think you’re > oversimplifying it and cannot simply state 180/36 = 5 minutes. Throwing in > numbers without explanation creates confusion. > > I have some questions when reading your AIP. I have to make lots of > assumptions and think explaining it more in depth would clarify a lot. > > > * Should a DagScheduler and task run in the same slots? Should there be > any difference between running the two? > * How does this work together with e.g. Celery? With the current Celery > setup you push tasks to the queue, which can be run on any worker. With your > setup it seems you push a DagScheduler and/or task. Does the DagScheduler > push tasks itself to the queue so that any task can run anywhere? And is it > correct to assume the DagScheduler polls the DB to check if a task is > finished? > * “If a cycle is done the MainScheduler should schedule a new > DagScheduler" -> I assume the dagscheduler would set state of a dagrun to > finished? And the mainscheduler simply checks when the next interval is > finished and to start a new DagScheduler. > * The naming is confusing to me. How about naming the DagScheduler > “DagRunManager”, because that’s what I believe it does? > * I’m not convinced this is the way to go. Currently the scheduler process > does a lot more than just scheduling. I.e. also parsing of the DAGs, which I > believe can be optimised a lot. I think splitting up the responsibilities of > the scheduler would be more beneficial, instead of adding complexity by > creating more “schedulers”. Would you agree? > > On a final note, to this whole thread: I’m very wary of doing HA/distributed > work ourselves. It adds a lot of complexity, locking is a tricky subject > (although well thought out by many others, so not impossible). Before going > there, I would suggest to put effort into optimising the (single) scheduler > first, e.g. by splitting responsibilities between DAG parsing and actual > scheduling. > > Cheers, > Bas > > On 18 Mar 2019, at 07:18, Peter van t Hof > <pjrvant...@gmail.com<mailto:pjrvant...@gmail.com>> wrote: > > Hi, > > My proposal is focusing mainly on scalability and indeed not so much on HA. > This mainly because that is also the main issue from the original author. > Have a form of HA on this MainScheduler would still be nice to have. > > The problem with is that have a fixed number of scheduler does not scale on > the load. On my current client they try to execute 5000+ DAG’s at the same > time. A single scheduler cycle to touch all DAG’s takes 2-3 hour. So to do > this within 5 min 36 of those schedulers with locking should be there at all > time. After 2 hours 2 schedulers would be enough, this means in this > situation 34 scheduler processes are wasted and only producing overhead. > > This DagScheduler is a short living task, so this is not a persistent worker > process. The MainScheduler should resubmit when it is required. > > Gr, > Peter > > > On 18 Mar 2019, at 05:32, Maxime Beauchemin > <maximebeauche...@gmail.com<mailto:maximebeauche...@gmail.com>> wrote: > > The proposal reads "Looking at the original AIP-15 the author proposes to > use locking to enable the use of multiple schedulers, this might introduce > unnecessary complexity" > > To me introducing multiple roles (master scheduler + scheduler minions), > may be actually more complex than just having "shared nothing" schedulers > with locking. The former is also less scalable (whatever workload is done > on that master [say serialization] can hit scale issues) and is less HA (as > it relies on the orchestrator [k8s] for HA). > > My personal incline has always been going towards renaming the scheduler to > "supervisor" (as it already does significantly more than just triggering > tasks) and allowing many instances of that role, and using locks where > necessary. That way there are just 2 roles in the cluster: supervisor and > worker processes. Depending on the executor (say for k8s) you don't even > need actual persistent worker processes. > > Max > > On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof > <pjrvant...@gmail.com<mailto:pjrvant...@gmail.com>> > wrote: > > Hi all, > > I think that scheduling locking is maybe not the best way in solving this > issue. Still I’m in support of taking a good look at the scheduler because > it has some real scaling issues. > > I did wrote an alternative proposal to solve the scalability of the > scheduler: > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler > < > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler > > > Any input on this is welcome. > > Gr, > Peter > > > On 3 Mar 2019, at 03:26, Deng Xiaodong <xd.den...@gmail.com> wrote: > > Thanks Max. > > I have documented all the discussions around this topic & useful inputs > into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling > Performance) > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651 > < > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>. > > > More inputs from folks are welcomed. > > Thanks. > > > XD > > On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin < > maximebeauche...@gmail.com> wrote: > > Personally I'd vote against the idea of having certain scheduler > handling a > subset of the DAGs, that's just not HA. > > Also if you are in an env where you have a small number of large DAGs, > the > odds of having wasted work and double-firing get pretty high. > > With the lock in place, it's just a matter of the scheduler loop to > select > (in a db transaction) the dag that's not been processed for the longest > time that is not locked. Flipping the lock flag to true should be part > of > the db transaction. We probably need a btree index on lock and last > processed time. > > This way adding scheduler processes increases the scheduling pace, and > provides an HA solution. No leader / master / slave or election process, > just equal workers that work together. > > Max > > On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com> > wrote: > > Get your point and agree. And the suggestion you gave lastly to random > sort DAGs is a great idea to address it. Thanks! > > XD > > On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com> > wrote: > > I think that the probability calculation holds only if there is no > correlation between different schedulers. I think however there might > be > an > accidental correlation if you think about typical deployments. > > Some details why I think accidental correlation is possible and even > likely. Assume that: > > - we have similar and similarly busy machines running schedulers > (likely) > - time is synchronised between the machines (likely) > - the machines have the same DAG folders mounted (or copied) and the > same filesystem is used (this is exactly what multiple schedulers > deployment is all about) > - the schedulers start scanning at exactly the same time (crossing > 0:00 > second every full five minutes for example) - this I am not sure but > I > imagine this might be "typical" behaviour. > - they process list of DAGs in exactly the same sequence (it looks > like > this is the case dag_processing > < > > https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300 > > and models/__init__ > < > > https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567 > : > we use os.walk which uses os.listdir for which sequence of processing > depends on the filesystem implementation > < > > https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic> > and > then we append files to the list) > > Then it's rather likely that the schedulers will be competing about > the > very same DAGs at the very beginning. Locking will change how quickly > they > process each DAG of course, but If the DAGs are of similar sizes it's > also > likely that the speed of scanning (DAGS/s) is similar for all > schedulers. > The schedulers will then catch-up with each other and might pretty > much > continuously compete for the same DAGs almost all the time. > > It can be mitigated super-easily by random sorting of the DAGs folder > list > after it is prepared (it's file-system dependent now so we do not > rely on > particular order) . Then the probability numbers will hold perfectly I > think :) > > J. > > > On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com> > wrote: > > I’m thinking of which architecture would be ideal. > > > # Option-1: > The master-slave architecture would be one option. But > leader-selection > will be very essential to consider, otherwise we have issue in terms > of > HA > again. > > > # Option-2: > Another option we may consider is to simply start multiple scheduler > instances (just using the current implementation, after modify & > validate > the scheduler_lock on DagModel). > > - In this case, given we handle everything properly using locking, we > don’t need to worry too much about double-scheduling/triggering. > > - Another potential concern I had earlier is that different > schedulers > may > compete with each other and cause “waste” of scheduler resource. > After further thinking, I realise this is a typical Birthday Problem. > Given we have m DAGs, and n schedulers, at any moment, the > probability > that all schedulers are working on different DAGs is m!/((m-n)! * > (m^n)), > and the probability that there are schedulers competing on the same > DAG > will be 1-m!/((m-n)! * (m^n)). > > Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, > the > probability that there is schedulers competing on the same DAG is > only > 0.5%. If we run 2 schedulers against 300 DAGs, this probability is > only > 0.33%. > (This probability will be higher if m/n is low. But users should not > start > too many schedulers if they don’t have that many DAGs). > > Given the probability of schedulers competing is so low, my concern > on > scheduler resource waste is not really valid. > > > > Based on these calculations/assessment, I think we can go for > option-2, > i.e. we don’t make big change in the current implementation. > Instead, we > ensure the scheduler_lock is working well and test intensively on > running > multiple schedulers. Then we should be good to let users know that > it’s > safe to run multiple schedulers. > > Please share your thoughts on this and correct me if I’m wrong in any > point above. Thanks. > > > XD > > > Reference: https://en.wikipedia.org/wiki/Birthday_problem < > https://en.wikipedia.org/wiki/Birthday_problem> > > > On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote: > > Does the proposal use master-slave architecture(leader scheduler vs > slave > scheduler)? > > On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com> > wrote: > > Preventing double-triggering by separating DAG files different > schedulers > parse sounds easier and more intuitive. I actually removed one of > the > double-triggering prevention logic here > < > > > > https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127 > (expensive) > and > was relying on this lock > < > > > > https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233 > > to > prevent double-firing and safe-guard our non-idempotent tasks( btw > the > insert can be insert overwrite to be idempotent). > > Also tho in Airbnb we requeue tasks a lot, we haven't see > double-firing > recently. > > Cheers, > Kevin Y > > On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin < > maximebeauche...@gmail.com> > wrote: > > Forgot to mention: the intention was to use the lock, but I never > personally got to do the second phase which would consist of > skipping > the > DAG if the lock is on, and expire the lock eventually based on a > config > setting. > > Max > > On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin < > maximebeauche...@gmail.com> > wrote: > > My original intention with the lock was preventing > "double-triggering" > of > task (triggering refers to the scheduler putting the message in > the > queue). > Airflow now has good "double-firing-prevention" of tasks (firing > happens > when the worker receives the message and starts the task), even > if > the > scheduler was to go rogue or restart and send multiple triggers > for > a > task > instance, the worker(s) should only start one task instance. > That's > done > by > running the database assertions behind the conditions being met > as > read > database transaction (no task can alter the rows that validate > the > assertion while it's getting asserted). In practice it's a little > tricky > and we've seen rogue double-firing in the past (I have no idea > how > often > that happens). > > If we do want to prevent double-triggerring, we should make sure > that > 2 > schedulers aren't processing the same DAG or DagRun at the same > time. > That > would mean for the scheduler to not start the process of locked > DAGs, > and > by providing a mechanism to expire the locks after some time. > > Has anyone experienced double firing lately? If that exist we > should > fix > it, but also be careful around multiple scheduler > double-triggering > as > it > would make that problem potentially much worse. > > Max > > On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong < > xd.den...@gmail.com> > wrote: > > It’s exactly what my team is doing & what I shared here earlier > last > year > ( > > > > > > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > < > > > > > > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > > ) > > It’s somehow a “hacky” solution (and HA is not addressed), and > now > I’m > thinking how we can have it more proper & robust. > > > XD > > On 2 Mar 2019, at 12:04 AM, Mario Urquizo < > mario.urqu...@gmail.com> > wrote: > > We have been running multiple schedulers for about 3 months. > We > created > multiple services to run airflow schedulers. The only > difference > is > that > we have each of the schedulers pointed to a directory one level > deeper > than > the DAG home directory that the workers and webapp use. We have > seen > much > better scheduling performance but this does not yet help with > HA. > > DAGS_HOME: > {airflow_home}/dags (webapp & workers) > {airflow_home}/dags/group-a/ (scheduler1) > {airflow_home}/dags/group-b/ (scheduler2) > {airflow_home}/dags/group-etc/ (scheduler3) > > Not sure if this helps, just sharing in case it does. > > Thank you, > Mario > > > On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin < > bdbr...@gmail.com> > wrote: > > I have done quite some work on making it possible to run > multiple > schedulers at the same time. At the moment I don’t think > there > are > real > blockers actually to do so. We just don’t actively test it. > > Database locking is mostly in place (DagRuns and > TaskInstances). > And > I > think the worst that can happen is that a task is scheduled > twice. > The > task > will detect this most of the time and kill one off if > concurrent > if > not > sequential then I will run again in some occasions. Everyone > is > having > idempotent tasks right so no harm done? ;-) > > Have you encountered issues? Maybe work those out? > > Cheers > Bolke. > > Verstuurd vanaf mijn iPad > > Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong < > xd.den...@gmail.com> > het > volgende geschreven: > > Hi Max, > > Following > > > > > > > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > < > > > > > > > https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E > , > I’m trying to prepare an AIP for supporting > multiple-scheduler in > Airflow > (mainly for HA and Higher scheduling performance). > > Along the process of code checking, I found that there is one > attribute > of DagModel, “scheduler_lock”. It’s not used at all in current > implementation, but it was introduced long time back (2015) to > allow > multiple schedulers to work together ( > > > > > > > https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 > < > > > > > > > https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620 > > ). > > Since you were the original author of it, it would be very > helpful > if > you can kindly share why the multiple-schedulers > implementation > was > removed > eventually, and what challenges/complexity there were. > (You already shared a few valuable inputs in the earlier > discussion > > > > > > > https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E > < > > > > > > > https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E > > , mainly relating to hiccups around concurrency, cross DAG > prioritisation & > load on DB. Other than these, anything else you would like to > advise?) > > I will also dive into the git history further to understand > it > better. > > Thanks. > > > XD > > > > > > > > > -- > > Jarek Potiuk > Polidea <https://www.polidea.com/> | Principal Software Engineer > > M: +48 660 796 129 <+48660796129> > E: jarek.pot...@polidea.com > > > > > > >