Love this idea, Max - this exactly fits with how we'd want to use it. Almost like an Airflow "wire protocol". I think there'd be two specifications: one for how the running container communicates back a DAG (or other state) via a REST API, and another for a container metadata specification (such as communicating labels/versioning, used for reading container into Airflow UI).
For our biggest and most important Airflow pipelines, they're already >50% containerized (using the KubernetesOperator) - but often several different containers for different types of tasks in the same pipeline. I could envision a world where Airflow is aware of a "DAG container", which becomes the default task container for that DAG and is capable of the parsing/API calls; and then if you need a different execution-time container you can provide that as a task arg (or in default_args). On Wed, Feb 27, 2019 at 2:24 PM Maxime Beauchemin < maximebeauche...@gmail.com> wrote: > Oh a few more things I wanted to bring up. > > While playing with pex (years ago!), I added a feature to open up a > full-featured, DAG-centric CLI from a DAG file. That feature could become > the interface of a containerized DAG approach. > > As far as I know the feature is not well documented and important for > containerization. So let me explain it here. It goes something like this: > > dag = DAG() > {...dag is defined here...} > if __name__ == "__main__": > dag.cli() > > Here's an example: > > https://github.com/apache/airflow/blob/master/airflow/example_dags/example_bash_operator.py#L72 > > > Now you can do things like `python mydagmodule.py list_tasks` or `python > mydagmodule.py run my_task_id 2019-01-01`. All the dag-centric Airflow CLI > commands are made available here, and you don't have to pass the DAG > argument to them in this context. In fact it uses the same calls/code as > the main CLI. > > This means that you can bake a docker around that DAG module and set that > dag module as your container's entry point. You can imagine now you just > need for the worker to fetch the docker from the docker registry and > `docker run` the right run command. Thats' fully containerized task > execution. I think it would take like 10 lines of code to change Airflow to > operate this way. (without taking into account change management or > supporting different docker registry, or the web UI, ...). > > Now about the problem of loosing control over the core I mentioned in my > previous email (where the core is baked in the image and we loose > centralized control over that logic), I think we need some sort of > lightweight Airflow SDK that works over the REST api. The DAGs, instead of > importing the whole Airflow python package would only import that SDK, and > the server side implementation of the calls can be evolved at a faster pace > than the SDK. > > Max > > On Wed, Feb 27, 2019 at 10:53 AM James Meickle > <jmeic...@quantopian.com.invalid> wrote: > > > On the topic of using Docker, I highly recommend looking at Argo > Workflows > > and some of their sample code: https://github.com/argoproj/argo > > > > tl;dr is that it's a workflow management tool where DAGs are expressed as > > YAML manifests, and tasks are just containers run on Kubernetes. > > > > I think that there's a lot of value in Airflow's use of Python rather > than > > a YAML-based DSL. But I do think that containers are the future, and I'm > > hopeful that Airflow develops in the direction of focusing on being a > > principled Python framework for managing tasks/data executed in > containers, > > and the resulting execution state. > > > > > > On Tue, Feb 26, 2019 at 8:55 PM Maxime Beauchemin < > > maximebeauche...@gmail.com> wrote: > > > > > Related thoughts: > > > > > > * on the topic of serialization, let's be clear whether we're talking > > about > > > unidirectional serialization and *not* deserialization back to the > > object. > > > This works for making the web server stateless, but isn't a solution > > around > > > how DAG definition get shipped around on the cluster (which would be > nice > > > to have from a system standpoint, but we'd have to break lots of > dynamic > > > features, things like callbacks and attaching complex objects to DAGs, > > ...) > > > > > > * docker as "serialization" is interesting, I looked into "pex" format > in > > > the past. It's pretty cool to think of DAGs as micro docker application > > > that get shipped around and executed. The challenge with this is that > it > > > makes it hard to control Airflow's core. Upgrading Airflow becomes > [also] > > > about upgrading the DAG docker images. We had similar concerns with > > "pex". > > > The data platform team looses their handle on the core, or has to get > in > > > the docker building business, which is atypical. For an upgrade, you'd > > have > > > to ask/force the people who own the DAG dockers to upgrade their > images, > > > else they won't run or something. Contract could be like "we'll only > run > > > your Airflow-docker-dag container if it's in a certain version range" > or > > > something like that. I think it's a cool idea. It gets intricate for > the > > > stateless web server though, it's a bit of a mind bender :) You could > ask > > > the docker to render the page (isn't that crazy?!) or ask the docker > for > > a > > > serialized version of the DAG that allows you to render the page > (similar > > > to point 1). > > > > > > * About storing in the db, for efficiency, the pk should be the SHA of > > the > > > deterministic serialized DAG. Only store a new entry if the DAG has > > > changed, and stamp the DagRun to a FK of that serialized DAG table. If > > > people have shapeshifting DAG within DagRuns we just do best effort, > show > > > them the last one or something like that > > > > > > * everyone hates pickles (including me), but it really almost works, > > might > > > be worth revisiting, or at least I think it's good for me to list out > the > > > blockers: > > > * JinjaTemplate objects are not serializable for some odd obscure > > > reason, I think the community can solve that easily, if someone wants a > > > full brain dump on this I can share what I know > > > * Size: as you pickle something, someone might have attached things > > > that recurse into hundreds of GBs-size pickle. Like some > > > on_failure_callback may bring in the whole Slack api library. That can > be > > > solved or mitigated in different ways. At some point I thought I'd > have a > > > DAG.validate() method that makes sure that the DAG can be pickled, and > > > serialized to a reasonable size pickle. I also think we'd have to make > > sure > > > operators are defined as more "abstract" otherwise the pickle includes > > > things like the whole pyhive lib and all sorts of other deps. It could > be > > > possible to limit what gets attached to the pickle (whitelist classes), > > and > > > dehydrate objects during serialization / and rehydrate them on the > other > > > size (assuming classes are on the worker too). If that sounds crazy to > > you, > > > it's because it is. > > > > > > * the other crazy idea is thinking of git repo (the code itself) as the > > > serialized DAG. There are git filesystem in userspace [fuse] that allow > > > dynamically accessing the git history like it's just a folder, as in > > > `REPO/{ANY_GIT_REF}/dags/mydag.py` . Beautifully hacky. A company with > a > > > blue logo with a big F on it that I used to work at did that. Talking > > about > > > embracing config-as-code! The DagRun can just stamp the git SHA it's > > > running with. > > > > > > Sorry about the confusion, config as code gets tricky around the > corners. > > > But it's all worth it, right? Right!? :) > > > > > > On Tue, Feb 26, 2019 at 3:09 AM Kevin Yang <yrql...@gmail.com> wrote: > > > > > > > My bad, I was misunderstanding a bit and mixing up two issues. I was > > > > thinking about the multiple runs for one DagRun issue( e.g. after we > > > clear > > > > the DagRun). > > > > > > > > This is an orthogonal issue. So the current implementation can work > in > > > the > > > > long term plan. > > > > > > > > Cheers, > > > > Kevin Y > > > > > > > > On Tue, Feb 26, 2019 at 2:34 AM Ash Berlin-Taylor <a...@apache.org> > > > wrote: > > > > > > > > > > > > > > > On 26 Feb 2019, at 09:37, Kevin Yang <yrql...@gmail.com> wrote: > > > > > > > > > > > > Now since we're already trying to have multiple graphs for one > > > > > > execution_date, maybe we should just have multiple DagRun. > > > > > > > > > > I thought that there is exactly 1 graph for a DAG run - dag_run > has a > > > > > "graph_id" column > > > > > > > > > >