Hi,

My proposal is focusing mainly on scalability and indeed not so much on HA. 
This mainly because that is also the main issue from the original author. Have 
a form of HA on this MainScheduler would still be nice to have.

The problem with is that have a fixed number of scheduler does not scale on the 
load. On my current client they try to execute 5000+ DAG’s at the same time. A 
single scheduler cycle to touch all DAG’s takes 2-3 hour. So to do this within 
5 min 36 of those schedulers with locking should be there at all time. After 2 
hours 2 schedulers would be enough, this means in this situation 34 scheduler 
processes are wasted and only producing overhead.

This DagScheduler is a short living task, so this is not a persistent worker 
process. The MainScheduler should resubmit when it is required.

Gr,
Peter


> On 18 Mar 2019, at 05:32, Maxime Beauchemin <maximebeauche...@gmail.com> 
> wrote:
> 
> The proposal reads "Looking at the original AIP-15 the author proposes to
> use locking to enable the use of multiple schedulers, this might introduce
> unnecessary complexity"
> 
> To me introducing multiple roles (master scheduler + scheduler minions),
> may be actually more complex than just having "shared nothing" schedulers
> with locking. The former is also less scalable (whatever workload is done
> on that master [say serialization] can hit scale issues) and is less HA (as
> it relies on the orchestrator [k8s] for HA).
> 
> My personal incline has always been going towards renaming the scheduler to
> "supervisor" (as it already does significantly more than just triggering
> tasks) and allowing many instances of that role, and using locks where
> necessary. That way there are just 2 roles in the cluster: supervisor and
> worker processes. Depending on the executor (say for k8s) you don't even
> need actual persistent worker processes.
> 
> Max
> 
> On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof <pjrvant...@gmail.com>
> wrote:
> 
>> Hi all,
>> 
>> I think that scheduling locking is maybe not the best way in solving this
>> issue. Still I’m in support of taking a good look at the scheduler because
>> it has some real scaling issues.
>> 
>> I did wrote an alternative proposal to solve the scalability of the
>> scheduler:
>> 
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
>> <
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
>>> 
>> 
>> Any input on this is welcome.
>> 
>> Gr,
>> Peter
>> 
>> 
>>> On 3 Mar 2019, at 03:26, Deng Xiaodong <xd.den...@gmail.com> wrote:
>>> 
>>> Thanks Max.
>>> 
>>> I have documented all the discussions around this topic & useful inputs
>> into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling
>> Performance)
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
>> <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>.
>> 
>>> 
>>> More inputs from folks are welcomed.
>>> 
>>> Thanks.
>>> 
>>> 
>>> XD
>>> 
>>>> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <
>> maximebeauche...@gmail.com> wrote:
>>>> 
>>>> Personally I'd vote against the idea of having certain scheduler
>> handling a
>>>> subset of the DAGs, that's just not HA.
>>>> 
>>>> Also if you are in an env where you have a small number of large DAGs,
>> the
>>>> odds of having wasted work and double-firing get pretty high.
>>>> 
>>>> With the lock in place, it's just a matter of the scheduler loop to
>> select
>>>> (in a db transaction) the dag that's not been processed for the longest
>>>> time that is not locked. Flipping the lock flag to true should be part
>> of
>>>> the db transaction. We probably need a btree index on lock and last
>>>> processed time.
>>>> 
>>>> This way adding scheduler processes increases the scheduling pace, and
>>>> provides an HA solution. No leader / master / slave or election process,
>>>> just equal workers that work together.
>>>> 
>>>> Max
>>>> 
>>>> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com>
>> wrote:
>>>> 
>>>>> Get your point and agree. And the suggestion you gave lastly to random
>>>>> sort DAGs is a great idea to address it. Thanks!
>>>>> 
>>>>> XD
>>>>> 
>>>>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com>
>>>>> wrote:
>>>>>> 
>>>>>> I think that the probability calculation holds only if there is no
>>>>>> correlation between different schedulers. I think however there might
>> be
>>>>> an
>>>>>> accidental correlation if you think about typical deployments.
>>>>>> 
>>>>>> Some details why I think accidental correlation is possible and even
>>>>>> likely. Assume that:
>>>>>> 
>>>>>> - we have similar and similarly busy machines running schedulers
>>>>> (likely)
>>>>>> - time is synchronised between the machines (likely)
>>>>>> - the machines have the same DAG folders mounted (or copied) and the
>>>>>> same filesystem is used (this is exactly what multiple schedulers
>>>>>> deployment is all about)
>>>>>> - the schedulers start scanning at exactly the same time (crossing
>> 0:00
>>>>>> second every full five minutes for example)  - this I am not sure but
>> I
>>>>>> imagine this might be "typical" behaviour.
>>>>>> - they process list of DAGs in exactly the same sequence (it looks
>> like
>>>>>> this is the case dag_processing
>>>>>> <
>>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
>>>>>> 
>>>>>> and models/__init__
>>>>>> <
>>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
>>>>>> :
>>>>>> we use os.walk which uses os.listdir for which sequence of processing
>>>>>> depends on the filesystem implementation
>>>>>> <
>>>>> 
>> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
>>>>>> and
>>>>>> then we append files to the list)
>>>>>> 
>>>>>> Then it's rather likely that the schedulers will be competing about
>> the
>>>>>> very same DAGs at the very beginning. Locking will change how quickly
>>>>> they
>>>>>> process each DAG of course, but If the DAGs are of similar sizes it's
>>>>> also
>>>>>> likely that the speed of scanning (DAGS/s) is similar for all
>> schedulers.
>>>>>> The schedulers will then catch-up with each other and might pretty
>> much
>>>>>> continuously compete for the same DAGs almost all the time.
>>>>>> 
>>>>>> It can be mitigated super-easily by random sorting of the DAGs folder
>>>>> list
>>>>>> after it is prepared (it's file-system dependent now so we do not
>> rely on
>>>>>> particular order) . Then the probability numbers will hold perfectly I
>>>>>> think :)
>>>>>> 
>>>>>> J.
>>>>>> 
>>>>>> 
>>>>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> I’m thinking of which architecture would be ideal.
>>>>>>> 
>>>>>>> 
>>>>>>> # Option-1:
>>>>>>> The master-slave architecture would be one option. But
>> leader-selection
>>>>>>> will be very essential to consider, otherwise we have issue in terms
>> of
>>>>> HA
>>>>>>> again.
>>>>>>> 
>>>>>>> 
>>>>>>> # Option-2:
>>>>>>> Another option we may consider is to simply start multiple scheduler
>>>>>>> instances (just using the current implementation, after modify &
>>>>> validate
>>>>>>> the scheduler_lock on DagModel).
>>>>>>> 
>>>>>>> - In this case, given we handle everything properly using locking, we
>>>>>>> don’t need to worry too much about double-scheduling/triggering.
>>>>>>> 
>>>>>>> - Another potential concern I had earlier is that different
>> schedulers
>>>>> may
>>>>>>> compete with each other and cause “waste” of scheduler resource.
>>>>>>> After further thinking, I realise this is a typical Birthday Problem.
>>>>>>> Given we have m DAGs, and n schedulers, at any moment, the
>> probability
>>>>>>> that all schedulers are working on different DAGs is m!/((m-n)! *
>>>>> (m^n)),
>>>>>>> and the probability that there are schedulers competing on the same
>> DAG
>>>>>>> will be 1-m!/((m-n)! * (m^n)).
>>>>>>> 
>>>>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment,
>> the
>>>>>>> probability that there is schedulers competing on the same DAG is
>> only
>>>>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is
>> only
>>>>>>> 0.33%.
>>>>>>> (This probability will be higher if m/n is low. But users should not
>>>>> start
>>>>>>> too many schedulers if they don’t have that many DAGs).
>>>>>>> 
>>>>>>> Given the probability of schedulers competing is so low, my concern
>> on
>>>>>>> scheduler resource waste is not really valid.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Based on these calculations/assessment, I think we can go for
>> option-2,
>>>>>>> i.e. we don’t make big change in the current implementation.
>> Instead, we
>>>>>>> ensure the scheduler_lock is working well and test intensively on
>>>>> running
>>>>>>> multiple schedulers. Then we should be good to let users know that
>> it’s
>>>>>>> safe to run multiple schedulers.
>>>>>>> 
>>>>>>> Please share your thoughts on this and correct me if I’m wrong in any
>>>>>>> point above. Thanks.
>>>>>>> 
>>>>>>> 
>>>>>>> XD
>>>>>>> 
>>>>>>> 
>>>>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
>>>>>>> https://en.wikipedia.org/wiki/Birthday_problem>
>>>>>>> 
>>>>>>> 
>>>>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> Does the proposal use master-slave architecture(leader scheduler vs
>>>>> slave
>>>>>>>> scheduler)?
>>>>>>>> 
>>>>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com>
>> wrote:
>>>>>>>> 
>>>>>>>>> Preventing double-triggering by separating DAG files different
>>>>>>> schedulers
>>>>>>>>> parse sounds easier and more intuitive. I actually removed one of
>> the
>>>>>>>>> double-triggering prevention logic here
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
>>>>>>>>>> (expensive)
>>>>>>>>> and
>>>>>>>>> was relying on this lock
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
>>>>>>>>>> 
>>>>>>>>> to
>>>>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw
>> the
>>>>>>>>> insert can be insert overwrite to be idempotent).
>>>>>>>>> 
>>>>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
>>>>> double-firing
>>>>>>>>> recently.
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Kevin Y
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
>>>>>>>>> maximebeauche...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Forgot to mention: the intention was to use the lock, but I never
>>>>>>>>>> personally got to do the second phase which would consist of
>> skipping
>>>>>>> the
>>>>>>>>>> DAG if the lock is on, and expire the lock eventually based on a
>>>>> config
>>>>>>>>>> setting.
>>>>>>>>>> 
>>>>>>>>>> Max
>>>>>>>>>> 
>>>>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
>>>>>>>>>> maximebeauche...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> My original intention with the lock was preventing
>>>>> "double-triggering"
>>>>>>>>> of
>>>>>>>>>>> task (triggering refers to the scheduler putting the message in
>> the
>>>>>>>>>> queue).
>>>>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
>>>>>>>>> happens
>>>>>>>>>>> when the worker receives the message and starts the task), even
>> if
>>>>> the
>>>>>>>>>>> scheduler was to go rogue or restart and send multiple triggers
>> for
>>>>> a
>>>>>>>>>> task
>>>>>>>>>>> instance, the worker(s) should only start one task instance.
>> That's
>>>>>>>>> done
>>>>>>>>>> by
>>>>>>>>>>> running the database assertions behind the conditions being met
>> as
>>>>>>> read
>>>>>>>>>>> database transaction (no task can alter the rows that validate
>> the
>>>>>>>>>>> assertion while it's getting asserted). In practice it's a little
>>>>>>>>> tricky
>>>>>>>>>>> and we've seen rogue double-firing in the past (I have no idea
>> how
>>>>>>>>> often
>>>>>>>>>>> that happens).
>>>>>>>>>>> 
>>>>>>>>>>> If we do want to prevent double-triggerring, we should make sure
>>>>> that
>>>>>>> 2
>>>>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same
>>>>> time.
>>>>>>>>>> That
>>>>>>>>>>> would mean for the scheduler to not start the process of locked
>>>>> DAGs,
>>>>>>>>> and
>>>>>>>>>>> by providing a mechanism to expire the locks after some time.
>>>>>>>>>>> 
>>>>>>>>>>> Has anyone experienced double firing lately? If that exist we
>> should
>>>>>>>>> fix
>>>>>>>>>>> it, but also be careful around multiple scheduler
>> double-triggering
>>>>> as
>>>>>>>>> it
>>>>>>>>>>> would make that problem potentially much worse.
>>>>>>>>>>> 
>>>>>>>>>>> Max
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <
>> xd.den...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier
>>>>> last
>>>>>>>>>> year
>>>>>>>>>>>> (
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>> <
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> 
>>>>>>>>>>>> )
>>>>>>>>>>>> 
>>>>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and
>> now
>>>>>>> I’m
>>>>>>>>>>>> thinking how we can have it more proper & robust.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> XD
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
>>>>> mario.urqu...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> We have been running multiple schedulers for about 3 months.
>> We
>>>>>>>>>> created
>>>>>>>>>>>>> multiple services to run airflow schedulers.  The only
>> difference
>>>>> is
>>>>>>>>>>>> that
>>>>>>>>>>>>> we have each of the schedulers pointed to a directory one level
>>>>>>>>> deeper
>>>>>>>>>>>> than
>>>>>>>>>>>>> the DAG home directory that the workers and webapp use. We have
>>>>> seen
>>>>>>>>>>>> much
>>>>>>>>>>>>> better scheduling performance but this does not yet help with
>> HA.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> DAGS_HOME:
>>>>>>>>>>>>> {airflow_home}/dags  (webapp & workers)
>>>>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
>>>>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
>>>>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Not sure if this helps, just sharing in case it does.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thank you,
>>>>>>>>>>>>> Mario
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <
>> bdbr...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I have done quite some work on making it possible to run
>> multiple
>>>>>>>>>>>>>> schedulers at the same time.  At the moment I don’t think
>> there
>>>>> are
>>>>>>>>>>>> real
>>>>>>>>>>>>>> blockers actually to do so. We just don’t actively test it.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Database locking is mostly in place (DagRuns and
>> TaskInstances).
>>>>>>>>> And
>>>>>>>>>> I
>>>>>>>>>>>>>> think the worst that can happen is that a task is scheduled
>>>>> twice.
>>>>>>>>>> The
>>>>>>>>>>>> task
>>>>>>>>>>>>>> will detect this most of the time and kill one off if
>> concurrent
>>>>> if
>>>>>>>>>> not
>>>>>>>>>>>>>> sequential then I will run again in some occasions. Everyone
>> is
>>>>>>>>>> having
>>>>>>>>>>>>>> idempotent tasks right so no harm done? ;-)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Have you encountered issues? Maybe work those out?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>> Bolke.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
>>>>> xd.den...@gmail.com>
>>>>>>>>>> het
>>>>>>>>>>>>>> volgende geschreven:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Max,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Following
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>>> ,
>>>>>>>>>>>>>> I’m trying to prepare an AIP for supporting
>> multiple-scheduler in
>>>>>>>>>>>> Airflow
>>>>>>>>>>>>>> (mainly for HA and Higher scheduling performance).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Along the process of code checking, I found that there is one
>>>>>>>>>>>> attribute
>>>>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
>>>>>>>>>>>>>> implementation, but it was introduced long time back (2015) to
>>>>>>>>> allow
>>>>>>>>>>>>>> multiple schedulers to work together (
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Since you were the original author of it, it would be very
>>>>> helpful
>>>>>>>>>> if
>>>>>>>>>>>>>> you can kindly share why the multiple-schedulers
>> implementation
>>>>> was
>>>>>>>>>>>> removed
>>>>>>>>>>>>>> eventually, and what challenges/complexity there were.
>>>>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier
>>>>>>>>> discussion
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
>>>>>>>>>>>> prioritisation &
>>>>>>>>>>>>>> load on DB. Other than these, anything else you would like to
>>>>>>>>>> advise?)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I will also dive into the git history further to understand
>> it
>>>>>>>>>> better.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> XD
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> 
>>>>>> Jarek Potiuk
>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>> 
>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>> E: jarek.pot...@polidea.com
>>>>> 
>>>>> 
>>> 
>> 
>> 

Reply via email to