I'll preface this with the fact that I'm relatively new to Airflow, and
haven't played around with a lot of the internals.

I find the idea of a DagFetcher interesting but would we worry about
slowing down the scheduler significantly? If the scheduler is having to
"fetch" multiple different DAG versions, be it git refs or artifacts from
Artifactory, we are talking about adding significant time to each scheduler
run. Also how would the scheduler know which DAGs to fetch from where if
there aren't local files on disk listing those DAGs? Maybe I'm missing
something in the implementation.

It seems to me that the fetching of the different versions should be
delegated to the Task (or TaskInstance) itself. That ensures we only spend
the time to "fetch" the version that is needed when it is needed. One down
side might be that each TaskInstance running for the same version of the
DAG might end up doing the "fetch" independently (duplicating that work).

I think this could be done by adding some version attribute to the DagRun
that gets set at creation, and have the scheduler pass that version to the
TaskInstances when they are created. You could even extend this so that you
could have an arbitrary set of "executor_parameters" that get set on a
DagRun and are passed to TaskInstances. Then the specific Executor class
that is running that TaskInstance could handle the "executor_parameters" as
it sees fit.

One thing I'm not clear on is how and when TaskInstances are created. When
the scheduler first sees a specific DagRun do all the TaskInstances get
created immediately, but only some of them get queued? Or does the
scheduler only create those TaskInstances which can be queued right now?

In particular if a DagRun gets created and while it is running the DAG is
updated and a new Task is added, will the scheduler pick up that new Task
for the running DagRun? If the answer is yes, then my suggestion above
would run the risk of scheduling a Task for a DAG version where that Task
didn't exist. I'm sure you could handle that somewhat gracefully but it's a
bit ugly.

Chris

On Wed, Feb 28, 2018 at 2:05 AM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> At a higher level I want to say a few things about the idea of enforcing
> version consistency within a DagRun.
>
> One thing we've been talking about is the need for a "DagFetcher"
> abstraction, where it's first implementation that would replace and mimic
> the current one would be "FileSystemDagFetcher". One specific DagFetcher
> implementation may or may not support version semantics, but if it does
> should be able to receive a version id and return the proper version of the
> DAG object. For instance that first "FileSystemDagFetcher" would not
> support version semantic, but perhaps a "GitRepoDagFetcher" would, or an
> "ArtifactoryDagFetcher", or "TarballInS3DagFetcher" may as well.
>
> Of course that assumes that the scheduler knows and stores the active
> version number when generating a new DagRun, and for that information to be
> leveraged on subsequent scheduler cycles and on workers when task are
> executed.
>
> This could also enable things like "remote" backfills (non local,
> parallelized) of a DAG definition that's on an arbitrary git ref (assuming
> a "GitRepoDagFetcher").
>
> There are [perhaps] unintuitive implications where clearing a single task
> would then re-run the old DAG definition on that task (since the version
> was stamped in the DagRun and hasn't changed), but deleting/recreating a
> DagRun would run the latest version (or any other version that may be
> specified for that matter).
>
> I'm unclear on how much work that represents exactly, but it's certainly
> doable and may only require to change part of the DagBag class and a few
> other places.
>
> Max
>
> On Tue, Feb 27, 2018 at 6:48 PM, David Capwell <dcapw...@gmail.com> wrote:
>
> > Thanks for your feedback!
> >
> > Option 1 is a non-starter for us. The reason is we have DAGs that take 9+
> > hours to run.
> >
> > Option 2 is more where my mind was going, but it's rather large.  How I
> see
> > it you need a MVCC DagBag that's aware of multiple versions (what
> provides
> > version?).  Assuming you can track active dag runs pointing to which
> > versions you know how to cleanup (fine with external).  The pro here is
> you
> > have snapshot isolation for dag_run, con is more bookkeeping and require
> > deploy to work with this (last part may be a good thing though).
> >
> > The only other option I can think of is to lock deploy so the system only
> > picks up new versions when no dag_run holds the lock.  This is flawed for
> > many reasons, but breaks horrible for dag_runs that takes minutes (I
> assume
> > 99% do).
> >
> >
> >
> > On Tue, Feb 27, 2018, 4:50 PM Joy Gao <j...@wepay.com> wrote:
> >
> > > Hi David!
> > >
> > > Thank you for clarifying, I think I understand your concern now. We
> > > currently also work around this by making sure a dag is turned off
> > > when we deploy a new version. We also make sure our jobs are
> > > idempotent and retry-enabled in the case when we forget to turn off
> > > the job, so the issue hasn't caused us too much headache.
> > >
> > > I do agree that it would be nice for Airflow to have the option to
> > > guarantee a single version of dag per dag run. I see two approaches:
> > >
> > > (1) If a dag is updated, the current dagrun fails and/or retries.
> > > (2) If a dag is updated, the current dagrun continues but uses version
> > > before the update.
> > >
> > > (1) requires some mechanism to compare dag generations. One option is
> > > to hash the dagfile and storing that value to the dagrun table, and
> > > compare against it each time a task is running. And in the case if the
> > > hash value is different, update the hash value, then fail/retry the
> > > dag. I think this is a fairly safe approach.
> > >
> > > (2) is trickier. A dag only has a property "fileloc" which tracks the
> > > location of the dag file, but the actual content of the dag file is
> > > never versioned. When a task instance starts running, it dynamically
> > > re-processes the dag file specified by the fileloc, generate all the
> > > task objects from the dag file, and fetch the task object by task_id
> > > in order to execute it. So in order to guarantee each dagrun to run a
> > > specific version, previous versions must be maintained on disk somehow
> > > (maintaining this information in memory is difficult, since if the
> > > scheduler/worker shuts down, that information is lost). This makes it
> > > a pretty big change, and I haven't thought much on how to implement
> > > it.
> > >
> > > I'm personally leaning towards (1) for sake of simplicity. Note that
> > > some users may not want dag to fail/retry even when dag is updated, so
> > > this should be an optional feature, not required.
> > >
> > > My scheduler-foo isn't that great, so curious what others have to say
> > > about this.
> > >
> > > On Fri, Feb 23, 2018 at 3:12 PM, David Capwell <dcapw...@gmail.com>
> > wrote:
> > > > Thanks for the reply Joy, let me walk you though things as they are
> > today
> > > >
> > > > 1) we don't stop airflow or disable DAGs while deploying updates to
> > > logic,
> > > > this is done live once its released
> > > > 2) the python script in the DAG folder doesn't actually have DAGs in
> it
> > > but
> > > > is a shim layer to allow us to deploy in a atomic way for a single
> host
> > > >   2.1) this script reads a file on local disk (less than disk page
> > size)
> > > to
> > > > find latest git commit deployed
> > > >   2.2) re-does the airflow DAG load process but pointing to the git
> > > commit
> > > > path
> > > >
> > > > Example directory structure
> > > >
> > > > /airflow/dags/shim.py
> > > > /airflow/real_dags/
> > > >                             /latest # pointer to latest commit
> > > >                             /[git commit]/
> > > >
> > > > This is how we make sure deploys are consistent within a single task.
> > > >
> > > >
> > > > Now, lets assume we have a fully atomic commit process and are able
> to
> > > > upgrade DAGs at the exact same moment.
> > > >
> > > > At time T0 the scheduler knows of DAG V1 and schedules two tasks,
> > Task1,
> > > > and Task2
> > > > At time T1 Task1 is picked up by Worker1, so starts executing the
> task
> > > (V1
> > > > logic)
> > > > At time T2 deploy commit happens, current DAG version: V2
> > > > At time T3, Task2 is picked up by Worker2, so starts executing the
> task
> > > (V2
> > > > logic)
> > > >
> > > > In many cases this isn't really a problem (tuning config change to
> > hadoop
> > > > job), but as we have more people using Airflow this is causing a lot
> of
> > > > time spent debugging why production acted differently than expected
> > (the
> > > > problem was already fixed... why is it still here?).  We also see
> that
> > > some
> > > > tasks expect a given behavior from other tasks, and since they live
> in
> > > the
> > > > same git repo they can modify both tasks at the same time if a
> breaking
> > > > change is needed, but when this rolls out to prod there isn't a way
> to
> > do
> > > > this other than turn off the DAG, and login to all hosts to verify
> > fully
> > > > deployed.
> > > >
> > > > We would like to remove this confusion and make generations/versions
> > > (same
> > > > thing really) exposed to users and make sure for a single dag_run
> only
> > > one
> > > > version is used.
> > > >
> > > > I hope this is more clear.
> > > >
> > > > On Fri, Feb 23, 2018 at 1:37 PM, Joy Gao <j...@wepay.com> wrote:
> > > >
> > > >> Hi David,
> > > >>
> > > >> Do you mind providing a concrete example of the scenario in which
> > > >> scheduler/workers see different states (I'm not 100% sure if I
> > > understood
> > > >> the issue at hand).
> > > >>
> > > >> And by same dag generation, are you referring to the dag version?
> (DAG
> > > >> version is currently not supported at all, but I can see it being a
> > > >> building block for future use cases).
> > > >>
> > > >> Joy
> > > >>
> > > >> On Fri, Feb 23, 2018 at 1:00 PM, David Capwell <dcapw...@gmail.com>
> > > wrote:
> > > >>
> > > >> > My current thinking is to add a field to the dag table that is
> > > optional
> > > >> and
> > > >> > provided by the dag. We currently intercept the load path do could
> > use
> > > >> this
> > > >> > field to make sure we load the same generation.  My concern here
> is
> > > the
> > > >> > interaction with the scheduler, not as familiar with that logic to
> > > >> predict
> > > >> > corner cases were this would fail.
> > > >> >
> > > >> > Any other recommendations for how this could be done?
> > > >> >
> > > >> > On Mon, Feb 19, 2018, 10:33 PM David Capwell <dcapw...@gmail.com>
> > > wrote:
> > > >> >
> > > >> > > We have been using airflow for logic that delegates to other
> > > systems so
> > > >> > > inject a task all tasks depends to make sure all resources used
> > are
> > > the
> > > >> > > same for all tasks in the dag. This works well for tasks that
> > > delegates
> > > >> > to
> > > >> > > external systems but people are starting to need to run logic in
> > > >> airflow
> > > >> > > and the fact that scheduler and all workers can see different
> > > states is
> > > >> > > causing issues
> > > >> > >
> > > >> > > We can make sure that all the code is deployed in a consistent
> way
> > > but
> > > >> > > need help from the scheduler to tell the workers the current
> > > generation
> > > >> > for
> > > >> > > a DAG.
> > > >> > >
> > > >> > > My question is, what would be the best way to modify airflow to
> > > allow
> > > >> > DAGs
> > > >> > > to define a generation value that the scheduler could send to
> > > workers?
> > > >> > >
> > > >> > > Thanks
> > > >> > >
> > > >> >
> > > >>
> > >
> > >
> >
>

Reply via email to