Personally I'd vote against the idea of having certain scheduler handling a
subset of the DAGs, that's just not HA.

Also if you are in an env where you have a small number of large DAGs, the
odds of having wasted work and double-firing get pretty high.

With the lock in place, it's just a matter of the scheduler loop to select
(in a db transaction) the dag that's not been processed for the longest
time that is not locked. Flipping the lock flag to true should be part of
the db transaction. We probably need a btree index on lock and last
processed time.

This way adding scheduler processes increases the scheduling pace, and
provides an HA solution. No leader / master / slave or election process,
just equal workers that work together.

Max

On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com> wrote:

> Get your point and agree. And the suggestion you gave lastly to random
> sort DAGs is a great idea to address it. Thanks!
>
> XD
>
> > On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com>
> wrote:
> >
> > I think that the probability calculation holds only if there is no
> > correlation between different schedulers. I think however there might be
> an
> > accidental correlation if you think about typical deployments.
> >
> > Some details why I think accidental correlation is possible and even
> > likely. Assume that:
> >
> >   - we have similar and similarly busy machines running schedulers
> (likely)
> >   - time is synchronised between the machines (likely)
> >   - the machines have the same DAG folders mounted (or copied) and the
> >   same filesystem is used (this is exactly what multiple schedulers
> >   deployment is all about)
> >   - the schedulers start scanning at exactly the same time (crossing 0:00
> >   second every full five minutes for example)  - this I am not sure but I
> >   imagine this might be "typical" behaviour.
> >   - they process list of DAGs in exactly the same sequence (it looks like
> >   this is the case dag_processing
> >   <
> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
> >
> >   and models/__init__
> >   <
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
> >:
> >   we use os.walk which uses os.listdir for which sequence of processing
> >   depends on the filesystem implementation
> >   <
> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
> > and
> >   then we append files to the list)
> >
> > Then it's rather likely that the schedulers will be competing about the
> > very same DAGs at the very beginning. Locking will change how quickly
> they
> > process each DAG of course, but If the DAGs are of similar sizes it's
> also
> > likely that the speed of scanning (DAGS/s) is similar for all schedulers.
> > The schedulers will then catch-up with each other and might pretty much
> > continuously compete for the same DAGs almost all the time.
> >
> > It can be mitigated super-easily by random sorting of the DAGs folder
> list
> > after it is prepared (it's file-system dependent now so we do not rely on
> > particular order) . Then the probability numbers will hold perfectly I
> > think :)
> >
> > J.
> >
> >
> > On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com>
> wrote:
> >
> >> I’m thinking of which architecture would be ideal.
> >>
> >>
> >> # Option-1:
> >> The master-slave architecture would be one option. But leader-selection
> >> will be very essential to consider, otherwise we have issue in terms of
> HA
> >> again.
> >>
> >>
> >> # Option-2:
> >> Another option we may consider is to simply start multiple scheduler
> >> instances (just using the current implementation, after modify &
> validate
> >> the scheduler_lock on DagModel).
> >>
> >> - In this case, given we handle everything properly using locking, we
> >> don’t need to worry too much about double-scheduling/triggering.
> >>
> >> - Another potential concern I had earlier is that different schedulers
> may
> >> compete with each other and cause “waste” of scheduler resource.
> >> After further thinking, I realise this is a typical Birthday Problem.
> >> Given we have m DAGs, and n schedulers, at any moment, the probability
> >> that all schedulers are working on different DAGs is m!/((m-n)! *
> (m^n)),
> >> and the probability that there are schedulers competing on the same DAG
> >> will be 1-m!/((m-n)! * (m^n)).
> >>
> >> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the
> >> probability that there is schedulers competing on the same DAG is only
> >> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only
> >> 0.33%.
> >> (This probability will be higher if m/n is low. But users should not
> start
> >> too many schedulers if they don’t have that many DAGs).
> >>
> >> Given the probability of schedulers competing is so low, my concern on
> >> scheduler resource waste is not really valid.
> >>
> >>
> >>
> >> Based on these calculations/assessment, I think we can go for option-2,
> >> i.e. we don’t make big change in the current implementation. Instead, we
> >> ensure the scheduler_lock is working well and test intensively on
> running
> >> multiple schedulers. Then we should be good to let users know that it’s
> >> safe to run multiple schedulers.
> >>
> >> Please share your thoughts on this and correct me if I’m wrong in any
> >> point above. Thanks.
> >>
> >>
> >> XD
> >>
> >>
> >> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
> >> https://en.wikipedia.org/wiki/Birthday_problem>
> >>
> >>
> >>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote:
> >>>
> >>> Does the proposal use master-slave architecture(leader scheduler vs
> slave
> >>> scheduler)?
> >>>
> >>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com> wrote:
> >>>
> >>>> Preventing double-triggering by separating DAG files different
> >> schedulers
> >>>> parse sounds easier and more intuitive. I actually removed one of the
> >>>> double-triggering prevention logic here
> >>>> <
> >>>>
> >>
> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
> >>>>> (expensive)
> >>>> and
> >>>> was relying on this lock
> >>>> <
> >>>>
> >>
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
> >>>>>
> >>>> to
> >>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the
> >>>> insert can be insert overwrite to be idempotent).
> >>>>
> >>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
> double-firing
> >>>> recently.
> >>>>
> >>>> Cheers,
> >>>> Kevin Y
> >>>>
> >>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
> >>>> maximebeauche...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Forgot to mention: the intention was to use the lock, but I never
> >>>>> personally got to do the second phase which would consist of skipping
> >> the
> >>>>> DAG if the lock is on, and expire the lock eventually based on a
> config
> >>>>> setting.
> >>>>>
> >>>>> Max
> >>>>>
> >>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> >>>>> maximebeauche...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> My original intention with the lock was preventing
> "double-triggering"
> >>>> of
> >>>>>> task (triggering refers to the scheduler putting the message in the
> >>>>> queue).
> >>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
> >>>> happens
> >>>>>> when the worker receives the message and starts the task), even if
> the
> >>>>>> scheduler was to go rogue or restart and send multiple triggers for
> a
> >>>>> task
> >>>>>> instance, the worker(s) should only start one task instance. That's
> >>>> done
> >>>>> by
> >>>>>> running the database assertions behind the conditions being met as
> >> read
> >>>>>> database transaction (no task can alter the rows that validate the
> >>>>>> assertion while it's getting asserted). In practice it's a little
> >>>> tricky
> >>>>>> and we've seen rogue double-firing in the past (I have no idea how
> >>>> often
> >>>>>> that happens).
> >>>>>>
> >>>>>> If we do want to prevent double-triggerring, we should make sure
> that
> >> 2
> >>>>>> schedulers aren't processing the same DAG or DagRun at the same
> time.
> >>>>> That
> >>>>>> would mean for the scheduler to not start the process of locked
> DAGs,
> >>>> and
> >>>>>> by providing a mechanism to expire the locks after some time.
> >>>>>>
> >>>>>> Has anyone experienced double firing lately? If that exist we should
> >>>> fix
> >>>>>> it, but also be careful around multiple scheduler double-triggering
> as
> >>>> it
> >>>>>> would make that problem potentially much worse.
> >>>>>>
> >>>>>> Max
> >>>>>>
> >>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd.den...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> It’s exactly what my team is doing & what I shared here earlier
> last
> >>>>> year
> >>>>>>> (
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>> <
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>
> >>>>>>> )
> >>>>>>>
> >>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now
> >> I’m
> >>>>>>> thinking how we can have it more proper & robust.
> >>>>>>>
> >>>>>>>
> >>>>>>> XD
> >>>>>>>
> >>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
> mario.urqu...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> We have been running multiple schedulers for about 3 months.  We
> >>>>> created
> >>>>>>>> multiple services to run airflow schedulers.  The only difference
> is
> >>>>>>> that
> >>>>>>>> we have each of the schedulers pointed to a directory one level
> >>>> deeper
> >>>>>>> than
> >>>>>>>> the DAG home directory that the workers and webapp use. We have
> seen
> >>>>>>> much
> >>>>>>>> better scheduling performance but this does not yet help with HA.
> >>>>>>>>
> >>>>>>>> DAGS_HOME:
> >>>>>>>> {airflow_home}/dags  (webapp & workers)
> >>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
> >>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
> >>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
> >>>>>>>>
> >>>>>>>> Not sure if this helps, just sharing in case it does.
> >>>>>>>>
> >>>>>>>> Thank you,
> >>>>>>>> Mario
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bdbr...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> I have done quite some work on making it possible to run multiple
> >>>>>>>>> schedulers at the same time.  At the moment I don’t think there
> are
> >>>>>>> real
> >>>>>>>>> blockers actually to do so. We just don’t actively test it.
> >>>>>>>>>
> >>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances).
> >>>> And
> >>>>> I
> >>>>>>>>> think the worst that can happen is that a task is scheduled
> twice.
> >>>>> The
> >>>>>>> task
> >>>>>>>>> will detect this most of the time and kill one off if concurrent
> if
> >>>>> not
> >>>>>>>>> sequential then I will run again in some occasions. Everyone is
> >>>>> having
> >>>>>>>>> idempotent tasks right so no harm done? ;-)
> >>>>>>>>>
> >>>>>>>>> Have you encountered issues? Maybe work those out?
> >>>>>>>>>
> >>>>>>>>> Cheers
> >>>>>>>>> Bolke.
> >>>>>>>>>
> >>>>>>>>> Verstuurd vanaf mijn iPad
> >>>>>>>>>
> >>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
> xd.den...@gmail.com>
> >>>>> het
> >>>>>>>>> volgende geschreven:
> >>>>>>>>>>
> >>>>>>>>>> Hi Max,
> >>>>>>>>>>
> >>>>>>>>>> Following
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>>>> <
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> >>>>>>>> ,
> >>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in
> >>>>>>> Airflow
> >>>>>>>>> (mainly for HA and Higher scheduling performance).
> >>>>>>>>>>
> >>>>>>>>>> Along the process of code checking, I found that there is one
> >>>>>>> attribute
> >>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
> >>>>>>>>> implementation, but it was introduced long time back (2015) to
> >>>> allow
> >>>>>>>>> multiple schedulers to work together (
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >>>>>>>>> <
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> >>>>>>>>
> >>>>>>>>> ).
> >>>>>>>>>>
> >>>>>>>>>> Since you were the original author of it, it would be very
> helpful
> >>>>> if
> >>>>>>>>> you can kindly share why the multiple-schedulers implementation
> was
> >>>>>>> removed
> >>>>>>>>> eventually, and what challenges/complexity there were.
> >>>>>>>>>> (You already shared a few valuable inputs in the earlier
> >>>> discussion
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >>>>>>>>> <
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> >>>>>>>>
> >>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
> >>>>>>> prioritisation &
> >>>>>>>>> load on DB. Other than these, anything else you would like to
> >>>>> advise?)
> >>>>>>>>>>
> >>>>>>>>>> I will also dive into the git history further to understand it
> >>>>> better.
> >>>>>>>>>>
> >>>>>>>>>> Thanks.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> XD
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>
> >>
> >
> > --
> >
> > Jarek Potiuk
> > Polidea <https://www.polidea.com/> | Principal Software Engineer
> >
> > M: +48 660 796 129 <+48660796129>
> > E: jarek.pot...@polidea.com
>
>

Reply via email to