Hi all,

I think that scheduling locking is maybe not the best way in solving this 
issue. Still I’m in support of taking a good look at the scheduler because it 
has some real scaling issues.

I did wrote an alternative proposal to solve the scalability of the scheduler:
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler 
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler>

Any input on this is welcome.

Gr,
Peter


> On 3 Mar 2019, at 03:26, Deng Xiaodong <xd.den...@gmail.com> wrote:
> 
> Thanks Max.
> 
> I have documented all the discussions around this topic & useful inputs into 
> AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling Performance) 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651 
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>. 
> 
> More inputs from folks are welcomed.
> 
> Thanks.
> 
> 
> XD
> 
>> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <maximebeauche...@gmail.com> 
>> wrote:
>> 
>> Personally I'd vote against the idea of having certain scheduler handling a
>> subset of the DAGs, that's just not HA.
>> 
>> Also if you are in an env where you have a small number of large DAGs, the
>> odds of having wasted work and double-firing get pretty high.
>> 
>> With the lock in place, it's just a matter of the scheduler loop to select
>> (in a db transaction) the dag that's not been processed for the longest
>> time that is not locked. Flipping the lock flag to true should be part of
>> the db transaction. We probably need a btree index on lock and last
>> processed time.
>> 
>> This way adding scheduler processes increases the scheduling pace, and
>> provides an HA solution. No leader / master / slave or election process,
>> just equal workers that work together.
>> 
>> Max
>> 
>> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com> wrote:
>> 
>>> Get your point and agree. And the suggestion you gave lastly to random
>>> sort DAGs is a great idea to address it. Thanks!
>>> 
>>> XD
>>> 
>>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com>
>>> wrote:
>>>> 
>>>> I think that the probability calculation holds only if there is no
>>>> correlation between different schedulers. I think however there might be
>>> an
>>>> accidental correlation if you think about typical deployments.
>>>> 
>>>> Some details why I think accidental correlation is possible and even
>>>> likely. Assume that:
>>>> 
>>>> - we have similar and similarly busy machines running schedulers
>>> (likely)
>>>> - time is synchronised between the machines (likely)
>>>> - the machines have the same DAG folders mounted (or copied) and the
>>>> same filesystem is used (this is exactly what multiple schedulers
>>>> deployment is all about)
>>>> - the schedulers start scanning at exactly the same time (crossing 0:00
>>>> second every full five minutes for example)  - this I am not sure but I
>>>> imagine this might be "typical" behaviour.
>>>> - they process list of DAGs in exactly the same sequence (it looks like
>>>> this is the case dag_processing
>>>> <
>>> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
>>>> 
>>>> and models/__init__
>>>> <
>>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
>>>> :
>>>> we use os.walk which uses os.listdir for which sequence of processing
>>>> depends on the filesystem implementation
>>>> <
>>> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
>>>> and
>>>> then we append files to the list)
>>>> 
>>>> Then it's rather likely that the schedulers will be competing about the
>>>> very same DAGs at the very beginning. Locking will change how quickly
>>> they
>>>> process each DAG of course, but If the DAGs are of similar sizes it's
>>> also
>>>> likely that the speed of scanning (DAGS/s) is similar for all schedulers.
>>>> The schedulers will then catch-up with each other and might pretty much
>>>> continuously compete for the same DAGs almost all the time.
>>>> 
>>>> It can be mitigated super-easily by random sorting of the DAGs folder
>>> list
>>>> after it is prepared (it's file-system dependent now so we do not rely on
>>>> particular order) . Then the probability numbers will hold perfectly I
>>>> think :)
>>>> 
>>>> J.
>>>> 
>>>> 
>>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com>
>>> wrote:
>>>> 
>>>>> I’m thinking of which architecture would be ideal.
>>>>> 
>>>>> 
>>>>> # Option-1:
>>>>> The master-slave architecture would be one option. But leader-selection
>>>>> will be very essential to consider, otherwise we have issue in terms of
>>> HA
>>>>> again.
>>>>> 
>>>>> 
>>>>> # Option-2:
>>>>> Another option we may consider is to simply start multiple scheduler
>>>>> instances (just using the current implementation, after modify &
>>> validate
>>>>> the scheduler_lock on DagModel).
>>>>> 
>>>>> - In this case, given we handle everything properly using locking, we
>>>>> don’t need to worry too much about double-scheduling/triggering.
>>>>> 
>>>>> - Another potential concern I had earlier is that different schedulers
>>> may
>>>>> compete with each other and cause “waste” of scheduler resource.
>>>>> After further thinking, I realise this is a typical Birthday Problem.
>>>>> Given we have m DAGs, and n schedulers, at any moment, the probability
>>>>> that all schedulers are working on different DAGs is m!/((m-n)! *
>>> (m^n)),
>>>>> and the probability that there are schedulers competing on the same DAG
>>>>> will be 1-m!/((m-n)! * (m^n)).
>>>>> 
>>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the
>>>>> probability that there is schedulers competing on the same DAG is only
>>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only
>>>>> 0.33%.
>>>>> (This probability will be higher if m/n is low. But users should not
>>> start
>>>>> too many schedulers if they don’t have that many DAGs).
>>>>> 
>>>>> Given the probability of schedulers competing is so low, my concern on
>>>>> scheduler resource waste is not really valid.
>>>>> 
>>>>> 
>>>>> 
>>>>> Based on these calculations/assessment, I think we can go for option-2,
>>>>> i.e. we don’t make big change in the current implementation. Instead, we
>>>>> ensure the scheduler_lock is working well and test intensively on
>>> running
>>>>> multiple schedulers. Then we should be good to let users know that it’s
>>>>> safe to run multiple schedulers.
>>>>> 
>>>>> Please share your thoughts on this and correct me if I’m wrong in any
>>>>> point above. Thanks.
>>>>> 
>>>>> 
>>>>> XD
>>>>> 
>>>>> 
>>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
>>>>> https://en.wikipedia.org/wiki/Birthday_problem>
>>>>> 
>>>>> 
>>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote:
>>>>>> 
>>>>>> Does the proposal use master-slave architecture(leader scheduler vs
>>> slave
>>>>>> scheduler)?
>>>>>> 
>>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com> wrote:
>>>>>> 
>>>>>>> Preventing double-triggering by separating DAG files different
>>>>> schedulers
>>>>>>> parse sounds easier and more intuitive. I actually removed one of the
>>>>>>> double-triggering prevention logic here
>>>>>>> <
>>>>>>> 
>>>>> 
>>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
>>>>>>>> (expensive)
>>>>>>> and
>>>>>>> was relying on this lock
>>>>>>> <
>>>>>>> 
>>>>> 
>>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
>>>>>>>> 
>>>>>>> to
>>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw the
>>>>>>> insert can be insert overwrite to be idempotent).
>>>>>>> 
>>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
>>> double-firing
>>>>>>> recently.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Kevin Y
>>>>>>> 
>>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
>>>>>>> maximebeauche...@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Forgot to mention: the intention was to use the lock, but I never
>>>>>>>> personally got to do the second phase which would consist of skipping
>>>>> the
>>>>>>>> DAG if the lock is on, and expire the lock eventually based on a
>>> config
>>>>>>>> setting.
>>>>>>>> 
>>>>>>>> Max
>>>>>>>> 
>>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
>>>>>>>> maximebeauche...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> My original intention with the lock was preventing
>>> "double-triggering"
>>>>>>> of
>>>>>>>>> task (triggering refers to the scheduler putting the message in the
>>>>>>>> queue).
>>>>>>>>> Airflow now has good "double-firing-prevention" of tasks (firing
>>>>>>> happens
>>>>>>>>> when the worker receives the message and starts the task), even if
>>> the
>>>>>>>>> scheduler was to go rogue or restart and send multiple triggers for
>>> a
>>>>>>>> task
>>>>>>>>> instance, the worker(s) should only start one task instance. That's
>>>>>>> done
>>>>>>>> by
>>>>>>>>> running the database assertions behind the conditions being met as
>>>>> read
>>>>>>>>> database transaction (no task can alter the rows that validate the
>>>>>>>>> assertion while it's getting asserted). In practice it's a little
>>>>>>> tricky
>>>>>>>>> and we've seen rogue double-firing in the past (I have no idea how
>>>>>>> often
>>>>>>>>> that happens).
>>>>>>>>> 
>>>>>>>>> If we do want to prevent double-triggerring, we should make sure
>>> that
>>>>> 2
>>>>>>>>> schedulers aren't processing the same DAG or DagRun at the same
>>> time.
>>>>>>>> That
>>>>>>>>> would mean for the scheduler to not start the process of locked
>>> DAGs,
>>>>>>> and
>>>>>>>>> by providing a mechanism to expire the locks after some time.
>>>>>>>>> 
>>>>>>>>> Has anyone experienced double firing lately? If that exist we should
>>>>>>> fix
>>>>>>>>> it, but also be careful around multiple scheduler double-triggering
>>> as
>>>>>>> it
>>>>>>>>> would make that problem potentially much worse.
>>>>>>>>> 
>>>>>>>>> Max
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd.den...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> It’s exactly what my team is doing & what I shared here earlier
>>> last
>>>>>>>> year
>>>>>>>>>> (
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>> <
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>> 
>>>>>>>>>> )
>>>>>>>>>> 
>>>>>>>>>> It’s somehow a “hacky” solution (and HA is not addressed), and now
>>>>> I’m
>>>>>>>>>> thinking how we can have it more proper & robust.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> XD
>>>>>>>>>> 
>>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
>>> mario.urqu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> We have been running multiple schedulers for about 3 months.  We
>>>>>>>> created
>>>>>>>>>>> multiple services to run airflow schedulers.  The only difference
>>> is
>>>>>>>>>> that
>>>>>>>>>>> we have each of the schedulers pointed to a directory one level
>>>>>>> deeper
>>>>>>>>>> than
>>>>>>>>>>> the DAG home directory that the workers and webapp use. We have
>>> seen
>>>>>>>>>> much
>>>>>>>>>>> better scheduling performance but this does not yet help with HA.
>>>>>>>>>>> 
>>>>>>>>>>> DAGS_HOME:
>>>>>>>>>>> {airflow_home}/dags  (webapp & workers)
>>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
>>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
>>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
>>>>>>>>>>> 
>>>>>>>>>>> Not sure if this helps, just sharing in case it does.
>>>>>>>>>>> 
>>>>>>>>>>> Thank you,
>>>>>>>>>>> Mario
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bdbr...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> I have done quite some work on making it possible to run multiple
>>>>>>>>>>>> schedulers at the same time.  At the moment I don’t think there
>>> are
>>>>>>>>>> real
>>>>>>>>>>>> blockers actually to do so. We just don’t actively test it.
>>>>>>>>>>>> 
>>>>>>>>>>>> Database locking is mostly in place (DagRuns and TaskInstances).
>>>>>>> And
>>>>>>>> I
>>>>>>>>>>>> think the worst that can happen is that a task is scheduled
>>> twice.
>>>>>>>> The
>>>>>>>>>> task
>>>>>>>>>>>> will detect this most of the time and kill one off if concurrent
>>> if
>>>>>>>> not
>>>>>>>>>>>> sequential then I will run again in some occasions. Everyone is
>>>>>>>> having
>>>>>>>>>>>> idempotent tasks right so no harm done? ;-)
>>>>>>>>>>>> 
>>>>>>>>>>>> Have you encountered issues? Maybe work those out?
>>>>>>>>>>>> 
>>>>>>>>>>>> Cheers
>>>>>>>>>>>> Bolke.
>>>>>>>>>>>> 
>>>>>>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>>>>>>> 
>>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
>>> xd.den...@gmail.com>
>>>>>>>> het
>>>>>>>>>>>> volgende geschreven:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Max,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Following
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>> <
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> ,
>>>>>>>>>>>> I’m trying to prepare an AIP for supporting multiple-scheduler in
>>>>>>>>>> Airflow
>>>>>>>>>>>> (mainly for HA and Higher scheduling performance).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Along the process of code checking, I found that there is one
>>>>>>>>>> attribute
>>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not used at all in current
>>>>>>>>>>>> implementation, but it was introduced long time back (2015) to
>>>>>>> allow
>>>>>>>>>>>> multiple schedulers to work together (
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>>> <
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>> 
>>>>>>>>>>>> ).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Since you were the original author of it, it would be very
>>> helpful
>>>>>>>> if
>>>>>>>>>>>> you can kindly share why the multiple-schedulers implementation
>>> was
>>>>>>>>>> removed
>>>>>>>>>>>> eventually, and what challenges/complexity there were.
>>>>>>>>>>>>> (You already shared a few valuable inputs in the earlier
>>>>>>> discussion
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>>> <
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> 
>>>>>>>>>>>> , mainly relating to hiccups around concurrency, cross DAG
>>>>>>>>>> prioritisation &
>>>>>>>>>>>> load on DB. Other than these, anything else you would like to
>>>>>>>> advise?)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I will also dive into the git history further to understand it
>>>>>>>> better.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> XD
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> --
>>>> 
>>>> Jarek Potiuk
>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>> 
>>>> M: +48 660 796 129 <+48660796129>
>>>> E: jarek.pot...@polidea.com
>>> 
>>> 
> 

Reply via email to