Love this idea, Max - this exactly fits with how we'd want to use it.
Almost like an Airflow "wire protocol". I think there'd be two
specifications: one for how the running container communicates back a DAG
(or other state) via a REST API, and another for a container metadata
specification (such as communicating labels/versioning, used for reading
container into Airflow UI).

For our biggest and most important Airflow pipelines, they're already >50%
containerized (using the KubernetesOperator) - but often several different
containers for different types of tasks in the same pipeline. I could
envision a world where Airflow is aware of a "DAG container", which becomes
the default task container for that DAG and is capable of the parsing/API
calls; and then if you need a different execution-time container you can
provide that as a task arg (or in default_args).

On Wed, Feb 27, 2019 at 2:24 PM Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> Oh a few more things I wanted to bring up.
>
> While playing with pex (years ago!), I added a feature to open up a
> full-featured, DAG-centric CLI from a DAG file. That feature could become
> the interface of a containerized DAG approach.
>
> As far as I know the feature is not well documented and important for
> containerization. So let me explain it here. It goes something like this:
>
> dag = DAG()
> {...dag is defined here...}
> if __name__ == "__main__":
>     dag.cli()
>
> Here's an example:
>
> https://github.com/apache/airflow/blob/master/airflow/example_dags/example_bash_operator.py#L72
>
>
> Now you can do things like `python mydagmodule.py list_tasks` or `python
> mydagmodule.py run my_task_id 2019-01-01`. All the dag-centric Airflow CLI
> commands are made available here, and you don't have to pass the DAG
> argument to them in this context. In fact it uses the same calls/code as
> the main CLI.
>
> This means that you can bake a docker around that DAG module and set that
> dag module as your container's entry point. You can imagine now you just
> need for the worker to fetch the docker from the docker registry and
> `docker run` the right run command. Thats' fully containerized task
> execution. I think it would take like 10 lines of code to change Airflow to
> operate this way. (without taking into account change management or
> supporting different docker registry, or the web UI, ...).
>
> Now about the problem of loosing control over the core I mentioned in my
> previous email (where the core is baked in the image and we loose
> centralized control over that logic), I think we need some sort of
> lightweight Airflow SDK that works over the REST api. The DAGs, instead of
> importing the whole Airflow python package would only import that SDK, and
> the server side implementation of the calls can be evolved at a faster pace
> than the SDK.
>
> Max
>
> On Wed, Feb 27, 2019 at 10:53 AM James Meickle
> <jmeic...@quantopian.com.invalid> wrote:
>
> > On the topic of using Docker, I highly recommend looking at Argo
> Workflows
> > and some of their sample code: https://github.com/argoproj/argo
> >
> > tl;dr is that it's a workflow management tool where DAGs are expressed as
> > YAML manifests, and tasks are just containers run on Kubernetes.
> >
> > I think that there's a lot of value in Airflow's use of Python rather
> than
> > a YAML-based DSL. But I do think that containers are the future, and I'm
> > hopeful that Airflow develops in the direction of focusing on being a
> > principled Python framework for managing tasks/data executed in
> containers,
> > and the resulting execution state.
> >
> >
> > On Tue, Feb 26, 2019 at 8:55 PM Maxime Beauchemin <
> > maximebeauche...@gmail.com> wrote:
> >
> > > Related thoughts:
> > >
> > > * on the topic of serialization, let's be clear whether we're talking
> > about
> > > unidirectional serialization and *not* deserialization back to the
> > object.
> > > This works for making the web server stateless, but isn't a solution
> > around
> > > how DAG definition get shipped around on the cluster (which would be
> nice
> > > to have from a system standpoint, but we'd have to break lots of
> dynamic
> > > features, things like callbacks and attaching complex objects to DAGs,
> > ...)
> > >
> > > * docker as "serialization" is interesting, I looked into "pex" format
> in
> > > the past. It's pretty cool to think of DAGs as micro docker application
> > > that get shipped around and executed. The challenge with this is that
> it
> > > makes it hard to control Airflow's core. Upgrading Airflow becomes
> [also]
> > > about upgrading the DAG docker images. We had similar concerns with
> > "pex".
> > > The data platform team looses their handle on the core, or has to get
> in
> > > the docker building business, which is atypical. For an upgrade, you'd
> > have
> > > to ask/force the people who own the DAG dockers to upgrade their
> images,
> > > else they won't run or something. Contract could be like "we'll only
> run
> > > your Airflow-docker-dag container if it's in a certain version range"
> or
> > > something like that. I think it's a cool idea. It gets intricate for
> the
> > > stateless web server though, it's a bit of a mind bender :) You could
> ask
> > > the docker to render the page (isn't that crazy?!) or ask the docker
> for
> > a
> > > serialized version of the DAG that allows you to render the page
> (similar
> > > to point 1).
> > >
> > > * About storing in the db, for efficiency, the pk should be the SHA of
> > the
> > > deterministic serialized DAG. Only store a new entry if the DAG has
> > > changed, and stamp the DagRun to a FK of that serialized DAG table. If
> > > people have shapeshifting DAG within DagRuns we just do best effort,
> show
> > > them the last one or something like that
> > >
> > > * everyone hates pickles (including me), but it really almost works,
> > might
> > > be worth revisiting, or at least I think it's good for me to list out
> the
> > > blockers:
> > >     * JinjaTemplate objects are not serializable for some odd obscure
> > > reason, I think the community can solve that easily, if someone wants a
> > > full brain dump on this I can share what I know
> > >     * Size: as you pickle something, someone might have attached things
> > > that recurse into hundreds of GBs-size pickle. Like some
> > > on_failure_callback may bring in the whole Slack api library. That can
> be
> > > solved or mitigated in different ways. At some point I thought I'd
> have a
> > > DAG.validate() method that makes sure that the DAG can be pickled, and
> > > serialized to a reasonable size pickle. I also think we'd have to make
> > sure
> > > operators are defined as more "abstract" otherwise the pickle includes
> > > things like the whole pyhive lib and all sorts of other deps. It could
> be
> > > possible to limit what gets attached to the pickle (whitelist classes),
> > and
> > > dehydrate objects during serialization / and rehydrate them on the
> other
> > > size (assuming classes are on the worker too). If that sounds crazy to
> > you,
> > > it's because it is.
> > >
> > > * the other crazy idea is thinking of git repo (the code itself) as the
> > > serialized DAG. There are git filesystem in userspace [fuse] that allow
> > > dynamically accessing the git history like it's just a folder, as in
> > > `REPO/{ANY_GIT_REF}/dags/mydag.py` . Beautifully hacky. A company with
> a
> > > blue logo with a big F on it that I used to work at did that. Talking
> > about
> > > embracing config-as-code! The DagRun can just stamp the git SHA it's
> > > running with.
> > >
> > > Sorry about the confusion, config as code gets tricky around the
> corners.
> > > But it's all worth it, right? Right!? :)
> > >
> > > On Tue, Feb 26, 2019 at 3:09 AM Kevin Yang <yrql...@gmail.com> wrote:
> > >
> > > > My bad, I was misunderstanding a bit and mixing up two issues. I was
> > > > thinking about the multiple runs for one DagRun issue( e.g. after we
> > > clear
> > > > the DagRun).
> > > >
> > > > This is an orthogonal issue. So the current implementation can work
> in
> > > the
> > > > long term plan.
> > > >
> > > > Cheers,
> > > > Kevin Y
> > > >
> > > > On Tue, Feb 26, 2019 at 2:34 AM Ash Berlin-Taylor <a...@apache.org>
> > > wrote:
> > > >
> > > > >
> > > > > > On 26 Feb 2019, at 09:37, Kevin Yang <yrql...@gmail.com> wrote:
> > > > > >
> > > > > > Now since we're already trying to have multiple graphs for one
> > > > > > execution_date, maybe we should just have multiple DagRun.
> > > > >
> > > > > I thought that there is exactly 1 graph for a DAG run - dag_run
> has a
> > > > > "graph_id" column
> > > >
> > >
> >
>

Reply via email to