Hey,

Wow, many responses... Trying to keep things simple:


The easiest is to reduce scope to stop processing files that are currently
not active, i.e. don't have a currently active dagrun.
Without an active dagrun, the processormanager would put a (datetime,
filename) tuple on a deque or priority queue type of
structure, such that it wouldn't start processing the file until the
datetime passes (remove all entries with datetime < now).
PrioQueue in python has no peek method, so something needs to be done there.

This breaks two conditions:

- When a file is modified to have additional dags or change the dag-id +
interval, it would need to be reprocessed anyway.
  So in that case remove the entry. Nothing about dependencies, unless
dependencies are able to put dags into the globals
   as well.

   inotify is the mechanism on linux, but OSX and windows have different
mechanisms. A package called
   "watchdog"  on python could be useful here for cross-platform
compatibility: https://pypi.python.org/pypi/watchdog

- The scheduler would no longer terminate on "max_runs", because some dags
may be daily. run_limit would no longer be
  met, so the scheduler would stay around a lot longer than usual. Not sure
if that's considered a problem. You could look at
   the union of run_limit + the deque and compare that against all known
files, but this establishes some weird dependency with
   the frequency that dags are run. I.e... if there are frequently run
dags, the scheduler would run for longer than cases where you
   only have daily dags.


Is this worth a shot at a PR to understand the changes?


Rgds,

Gerard


On Tue, Apr 25, 2017 at 1:13 AM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> [I wrote this while offline without having received the full conversation,
> sorry if it's a bit off and looks like it's disregarding previous comments]
>
> On Mon, Apr 24, 2017 at 4:09 PM, Maxime Beauchemin <
> maximebeauche...@gmail.com> wrote:
>
> > With configuration as code, you can't really know whether the DAG
> > definition has changed based on whether the module was altered. This
> python
> > module could be importing other modules that have been changed, could
> have
> > read a config file somewhere on the drive that might have changed, or
> read
> > from a DB that is constantly getting mutated.
> >
> > There are also issues around the fact that Python caches modules in
> > `sys.modules`, so even though the crawler is re-interpreting modules,
> > imported modules wouldn't get re-interpreted [as our DAG authors
> expected]
> >
> > For these reasons [and others I won't get into here], we decided that the
> > scheduler would use a subprocess pool and re-interpret the DAGs from
> > scratch at every cycle, insulating the different DAGs and guaranteeing no
> > interpreter caching.
> >
> > Side note: yaml parsing is much more expensive than other markup
> languages
> > and would recommend working around it to store DAG configuration. Our
> > longest-to-parse DAGs at Airbnb were reading yaml to build build a DAG,
> and
> > I believe someone wrote custom logic to avoid reparsing the yaml at every
> > cycle. Parsing equivalent json or hocon was an order of magnitude faster.
> >
> > Max
> >
> > On Mon, Apr 24, 2017 at 2:55 PM, Bolke de Bruin <bdbr...@gmail.com>
> wrote:
> >
> >> Inotify can work without a daemon. Just fire a call to the API when a
> >> file changes. Just a few lines in bash.
> >>
> >> If you bundle you dependencies in a zip you should be fine with the
> >> above. Or if we start using manifests that list the files that are
> needed
> >> in a dag...
> >>
> >>
> >> Sent from my iPhone
> >>
> >> > On 24 Apr 2017, at 22:46, Dan Davydov <dan.davy...@airbnb.com.
> INVALID>
> >> wrote:
> >> >
> >> > One idea to solve this is to use a daemon that uses inotify to watch
> for
> >> > changes in files and then reprocesses just those files. The hard part
> is
> >> > without any kind of dependency/build system for DAGs it can be hard to
> >> tell
> >> > which DAGs depend on which files.
> >> >
> >> > On Mon, Apr 24, 2017 at 1:21 PM, Gerard Toonstra <gtoons...@gmail.com
> >
> >> > wrote:
> >> >
> >> >> Hey,
> >> >>
> >> >> I've seen some people complain about DAG file processing times. An
> >> issue
> >> >> was raised about this today:
> >> >>
> >> >> https://issues.apache.org/jira/browse/AIRFLOW-1139
> >> >>
> >> >> I attempted to provide a good explanation what's going on. Feel free
> to
> >> >> validate and comment.
> >> >>
> >> >>
> >> >> I'm noticing that the file processor is a bit naive in the way it
> >> >> reprocesses DAGs. It doesn't look at the DAG interval for example, so
> >> it
> >> >> looks like it reprocesses all files continuously in one big batch,
> >> even if
> >> >> we can determine that the next "schedule"  for all its dags are in
> the
> >> >> future?
> >> >>
> >> >>
> >> >> Wondering if a change in the DagFileProcessingManager could optimize
> >> things
> >> >> a bit here.
> >> >>
> >> >> In the part where it gets the simple_dags from a file it's currently
> >> >> processing:
> >> >>
> >> >>                for simple_dag in processor.result:
> >> >>                    simple_dags.append(simple_dag)
> >> >>
> >> >> the file_path is in the context and the simple_dags should be able to
> >> >> provide the next interval date for each dag in the file.
> >> >>
> >> >> The idea is to add files to a sorted deque by
> "next_schedule_datetime"
> >> (the
> >> >> minimum next interval date), so that when we build the list
> >> >> "files_paths_to_queue", it can remove files that have dags that we
> know
> >> >> won't have a new dagrun for a while.
> >> >>
> >> >> One gotcha to resolve after that is to deal with files getting
> updated
> >> with
> >> >> new dags or changed dag definitions and renames and different
> interval
> >> >> schedules.
> >> >>
> >> >> Worth a PR to glance over?
> >> >>
> >> >> Rgds,
> >> >>
> >> >> Gerard
> >> >>
> >>
> >
> >
>

Reply via email to