Does anything change about your proposal if you do t assume that workers have 
“quick access” to the DAG files - i.e. what if we are on kube executors and the 
task spin up time plus git sync time is 30-60s?

(Perhaps this is an extreme case, but we are talking about extreme cases)

> On 18 Mar 2019, at 07:58, Bas Harenslak <basharens...@godatadriven.com> wrote:
> 
> Peter,
> 
> The numbers you mention seem to come out of the blue. I think you’re 
> oversimplifying it and cannot simply state 180/36 = 5 minutes. Throwing in 
> numbers without explanation creates confusion.
> 
> I have some questions when reading your AIP. I have to make lots of 
> assumptions and think explaining it more in depth would clarify a lot.
> 
> 
>  *   Should a DagScheduler and task run in the same slots? Should there be 
> any difference between running the two?
>  *   How does this work together with e.g. Celery? With the current Celery 
> setup you push tasks to the queue, which can be run on any worker. With your 
> setup it seems you push a DagScheduler and/or task. Does the DagScheduler 
> push tasks itself to the queue so that any task can run anywhere? And is it 
> correct to assume the DagScheduler polls the DB to check if a task is 
> finished?
>  *   “If a cycle is done the MainScheduler should schedule a new 
> DagScheduler" -> I assume the dagscheduler would set state of a dagrun to 
> finished? And the mainscheduler simply checks when the next interval is 
> finished and to start a new DagScheduler.
>  *   The naming is confusing to me. How about naming the DagScheduler 
> “DagRunManager”, because that’s what I believe it does?
>  *   I’m not convinced this is the way to go. Currently the scheduler process 
> does a lot more than just scheduling. I.e. also parsing of the DAGs, which I 
> believe can be optimised a lot. I think splitting up the responsibilities of 
> the scheduler would be more beneficial, instead of adding complexity by 
> creating more “schedulers”. Would you agree?
> 
> On a final note, to this whole thread: I’m very wary of doing HA/distributed 
> work ourselves. It adds a lot of complexity, locking is a tricky subject 
> (although well thought out by many others, so not impossible). Before going 
> there, I would suggest to put effort into optimising the (single) scheduler 
> first, e.g. by splitting responsibilities between DAG parsing and actual 
> scheduling.
> 
> Cheers,
> Bas
> 
> On 18 Mar 2019, at 07:18, Peter van t Hof 
> <pjrvant...@gmail.com<mailto:pjrvant...@gmail.com>> wrote:
> 
> Hi,
> 
> My proposal is focusing mainly on scalability and indeed not so much on HA. 
> This mainly because that is also the main issue from the original author. 
> Have a form of HA on this MainScheduler would still be nice to have.
> 
> The problem with is that have a fixed number of scheduler does not scale on 
> the load. On my current client they try to execute 5000+ DAG’s at the same 
> time. A single scheduler cycle to touch all DAG’s takes 2-3 hour. So to do 
> this within 5 min 36 of those schedulers with locking should be there at all 
> time. After 2 hours 2 schedulers would be enough, this means in this 
> situation 34 scheduler processes are wasted and only producing overhead.
> 
> This DagScheduler is a short living task, so this is not a persistent worker 
> process. The MainScheduler should resubmit when it is required.
> 
> Gr,
> Peter
> 
> 
> On 18 Mar 2019, at 05:32, Maxime Beauchemin 
> <maximebeauche...@gmail.com<mailto:maximebeauche...@gmail.com>> wrote:
> 
> The proposal reads "Looking at the original AIP-15 the author proposes to
> use locking to enable the use of multiple schedulers, this might introduce
> unnecessary complexity"
> 
> To me introducing multiple roles (master scheduler + scheduler minions),
> may be actually more complex than just having "shared nothing" schedulers
> with locking. The former is also less scalable (whatever workload is done
> on that master [say serialization] can hit scale issues) and is less HA (as
> it relies on the orchestrator [k8s] for HA).
> 
> My personal incline has always been going towards renaming the scheduler to
> "supervisor" (as it already does significantly more than just triggering
> tasks) and allowing many instances of that role, and using locks where
> necessary. That way there are just 2 roles in the cluster: supervisor and
> worker processes. Depending on the executor (say for k8s) you don't even
> need actual persistent worker processes.
> 
> Max
> 
> On Sun, Mar 17, 2019 at 1:52 AM Peter van t Hof 
> <pjrvant...@gmail.com<mailto:pjrvant...@gmail.com>>
> wrote:
> 
> Hi all,
> 
> I think that scheduling locking is maybe not the best way in solving this
> issue. Still I’m in support of taking a good look at the scheduler because
> it has some real scaling issues.
> 
> I did wrote an alternative proposal to solve the scalability of the
> scheduler:
> 
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
> <
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-15+Scalable+Scheduler
> 
> 
> Any input on this is welcome.
> 
> Gr,
> Peter
> 
> 
> On 3 Mar 2019, at 03:26, Deng Xiaodong <xd.den...@gmail.com> wrote:
> 
> Thanks Max.
> 
> I have documented all the discussions around this topic & useful inputs
> into AIP-15 (Support Multiple-Schedulers for HA & Better Scheduling
> Performance)
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>.
> 
> 
> More inputs from folks are welcomed.
> 
> Thanks.
> 
> 
> XD
> 
> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <
> maximebeauche...@gmail.com> wrote:
> 
> Personally I'd vote against the idea of having certain scheduler
> handling a
> subset of the DAGs, that's just not HA.
> 
> Also if you are in an env where you have a small number of large DAGs,
> the
> odds of having wasted work and double-firing get pretty high.
> 
> With the lock in place, it's just a matter of the scheduler loop to
> select
> (in a db transaction) the dag that's not been processed for the longest
> time that is not locked. Flipping the lock flag to true should be part
> of
> the db transaction. We probably need a btree index on lock and last
> processed time.
> 
> This way adding scheduler processes increases the scheduling pace, and
> provides an HA solution. No leader / master / slave or election process,
> just equal workers that work together.
> 
> Max
> 
> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.den...@gmail.com>
> wrote:
> 
> Get your point and agree. And the suggestion you gave lastly to random
> sort DAGs is a great idea to address it. Thanks!
> 
> XD
> 
> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <jarek.pot...@polidea.com>
> wrote:
> 
> I think that the probability calculation holds only if there is no
> correlation between different schedulers. I think however there might
> be
> an
> accidental correlation if you think about typical deployments.
> 
> Some details why I think accidental correlation is possible and even
> likely. Assume that:
> 
> - we have similar and similarly busy machines running schedulers
> (likely)
> - time is synchronised between the machines (likely)
> - the machines have the same DAG folders mounted (or copied) and the
> same filesystem is used (this is exactly what multiple schedulers
> deployment is all about)
> - the schedulers start scanning at exactly the same time (crossing
> 0:00
> second every full five minutes for example)  - this I am not sure but
> I
> imagine this might be "typical" behaviour.
> - they process list of DAGs in exactly the same sequence (it looks
> like
> this is the case dag_processing
> <
> 
> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
> 
> and models/__init__
> <
> 
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
> :
> we use os.walk which uses os.listdir for which sequence of processing
> depends on the filesystem implementation
> <
> 
> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
> and
> then we append files to the list)
> 
> Then it's rather likely that the schedulers will be competing about
> the
> very same DAGs at the very beginning. Locking will change how quickly
> they
> process each DAG of course, but If the DAGs are of similar sizes it's
> also
> likely that the speed of scanning (DAGS/s) is similar for all
> schedulers.
> The schedulers will then catch-up with each other and might pretty
> much
> continuously compete for the same DAGs almost all the time.
> 
> It can be mitigated super-easily by random sorting of the DAGs folder
> list
> after it is prepared (it's file-system dependent now so we do not
> rely on
> particular order) . Then the probability numbers will hold perfectly I
> think :)
> 
> J.
> 
> 
> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.den...@gmail.com>
> wrote:
> 
> I’m thinking of which architecture would be ideal.
> 
> 
> # Option-1:
> The master-slave architecture would be one option. But
> leader-selection
> will be very essential to consider, otherwise we have issue in terms
> of
> HA
> again.
> 
> 
> # Option-2:
> Another option we may consider is to simply start multiple scheduler
> instances (just using the current implementation, after modify &
> validate
> the scheduler_lock on DagModel).
> 
> - In this case, given we handle everything properly using locking, we
> don’t need to worry too much about double-scheduling/triggering.
> 
> - Another potential concern I had earlier is that different
> schedulers
> may
> compete with each other and cause “waste” of scheduler resource.
> After further thinking, I realise this is a typical Birthday Problem.
> Given we have m DAGs, and n schedulers, at any moment, the
> probability
> that all schedulers are working on different DAGs is m!/((m-n)! *
> (m^n)),
> and the probability that there are schedulers competing on the same
> DAG
> will be 1-m!/((m-n)! * (m^n)).
> 
> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment,
> the
> probability that there is schedulers competing on the same DAG is
> only
> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is
> only
> 0.33%.
> (This probability will be higher if m/n is low. But users should not
> start
> too many schedulers if they don’t have that many DAGs).
> 
> Given the probability of schedulers competing is so low, my concern
> on
> scheduler resource waste is not really valid.
> 
> 
> 
> Based on these calculations/assessment, I think we can go for
> option-2,
> i.e. we don’t make big change in the current implementation.
> Instead, we
> ensure the scheduler_lock is working well and test intensively on
> running
> multiple schedulers. Then we should be good to let users know that
> it’s
> safe to run multiple schedulers.
> 
> Please share your thoughts on this and correct me if I’m wrong in any
> point above. Thanks.
> 
> 
> XD
> 
> 
> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
> https://en.wikipedia.org/wiki/Birthday_problem>
> 
> 
> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengta...@gmail.com> wrote:
> 
> Does the proposal use master-slave architecture(leader scheduler vs
> slave
> scheduler)?
> 
> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrql...@gmail.com>
> wrote:
> 
> Preventing double-triggering by separating DAG files different
> schedulers
> parse sounds easier and more intuitive. I actually removed one of
> the
> double-triggering prevention logic here
> <
> 
> 
> 
> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
> (expensive)
> and
> was relying on this lock
> <
> 
> 
> 
> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
> 
> to
> prevent double-firing and safe-guard our non-idempotent tasks( btw
> the
> insert can be insert overwrite to be idempotent).
> 
> Also tho in Airbnb we requeue tasks a lot, we haven't see
> double-firing
> recently.
> 
> Cheers,
> Kevin Y
> 
> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
> maximebeauche...@gmail.com>
> wrote:
> 
> Forgot to mention: the intention was to use the lock, but I never
> personally got to do the second phase which would consist of
> skipping
> the
> DAG if the lock is on, and expire the lock eventually based on a
> config
> setting.
> 
> Max
> 
> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
> maximebeauche...@gmail.com>
> wrote:
> 
> My original intention with the lock was preventing
> "double-triggering"
> of
> task (triggering refers to the scheduler putting the message in
> the
> queue).
> Airflow now has good "double-firing-prevention" of tasks (firing
> happens
> when the worker receives the message and starts the task), even
> if
> the
> scheduler was to go rogue or restart and send multiple triggers
> for
> a
> task
> instance, the worker(s) should only start one task instance.
> That's
> done
> by
> running the database assertions behind the conditions being met
> as
> read
> database transaction (no task can alter the rows that validate
> the
> assertion while it's getting asserted). In practice it's a little
> tricky
> and we've seen rogue double-firing in the past (I have no idea
> how
> often
> that happens).
> 
> If we do want to prevent double-triggerring, we should make sure
> that
> 2
> schedulers aren't processing the same DAG or DagRun at the same
> time.
> That
> would mean for the scheduler to not start the process of locked
> DAGs,
> and
> by providing a mechanism to expire the locks after some time.
> 
> Has anyone experienced double firing lately? If that exist we
> should
> fix
> it, but also be careful around multiple scheduler
> double-triggering
> as
> it
> would make that problem potentially much worse.
> 
> Max
> 
> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <
> xd.den...@gmail.com>
> wrote:
> 
> It’s exactly what my team is doing & what I shared here earlier
> last
> year
> (
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> <
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> 
> )
> 
> It’s somehow a “hacky” solution (and HA is not addressed), and
> now
> I’m
> thinking how we can have it more proper & robust.
> 
> 
> XD
> 
> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
> mario.urqu...@gmail.com>
> wrote:
> 
> We have been running multiple schedulers for about 3 months.
> We
> created
> multiple services to run airflow schedulers.  The only
> difference
> is
> that
> we have each of the schedulers pointed to a directory one level
> deeper
> than
> the DAG home directory that the workers and webapp use. We have
> seen
> much
> better scheduling performance but this does not yet help with
> HA.
> 
> DAGS_HOME:
> {airflow_home}/dags  (webapp & workers)
> {airflow_home}/dags/group-a/ (scheduler1)
> {airflow_home}/dags/group-b/ (scheduler2)
> {airflow_home}/dags/group-etc/ (scheduler3)
> 
> Not sure if this helps, just sharing in case it does.
> 
> Thank you,
> Mario
> 
> 
> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <
> bdbr...@gmail.com>
> wrote:
> 
> I have done quite some work on making it possible to run
> multiple
> schedulers at the same time.  At the moment I don’t think
> there
> are
> real
> blockers actually to do so. We just don’t actively test it.
> 
> Database locking is mostly in place (DagRuns and
> TaskInstances).
> And
> I
> think the worst that can happen is that a task is scheduled
> twice.
> The
> task
> will detect this most of the time and kill one off if
> concurrent
> if
> not
> sequential then I will run again in some occasions. Everyone
> is
> having
> idempotent tasks right so no harm done? ;-)
> 
> Have you encountered issues? Maybe work those out?
> 
> Cheers
> Bolke.
> 
> Verstuurd vanaf mijn iPad
> 
> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong <
> xd.den...@gmail.com>
> het
> volgende geschreven:
> 
> Hi Max,
> 
> Following
> 
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> <
> 
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
> ,
> I’m trying to prepare an AIP for supporting
> multiple-scheduler in
> Airflow
> (mainly for HA and Higher scheduling performance).
> 
> Along the process of code checking, I found that there is one
> attribute
> of DagModel, “scheduler_lock”. It’s not used at all in current
> implementation, but it was introduced long time back (2015) to
> allow
> multiple schedulers to work together (
> 
> 
> 
> 
> 
> 
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> <
> 
> 
> 
> 
> 
> 
> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
> 
> ).
> 
> Since you were the original author of it, it would be very
> helpful
> if
> you can kindly share why the multiple-schedulers
> implementation
> was
> removed
> eventually, and what challenges/complexity there were.
> (You already shared a few valuable inputs in the earlier
> discussion
> 
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> <
> 
> 
> 
> 
> 
> 
> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
> 
> , mainly relating to hiccups around concurrency, cross DAG
> prioritisation &
> load on DB. Other than these, anything else you would like to
> advise?)
> 
> I will also dive into the git history further to understand
> it
> better.
> 
> Thanks.
> 
> 
> XD
> 
> 
> 
> 
> 
> 
> 
> 
> --
> 
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
> 
> M: +48 660 796 129 <+48660796129>
> E: jarek.pot...@polidea.com
> 
> 
> 
> 
> 
> 
> 

Reply via email to