I agree with Fokko - I feel this AIP is a decent stepping stone, and doesn't significantly change the airflow execution model - my major concern about _requiring_ kubernetes is that it is barrier to entry that might put off smaller users: operating a kubernetes cluster is hard to impossible if you are the only data engineer in a company!
(Even requiring docker is to some extent a bit of an operational step up.) That said the "dag CLI" approach outlined earlier could mean we still have the ability to have a LocalExecutor kind of thing. -ash On 27 February 2019 21:00:38 GMT, "Driesprong, Fokko" <fo...@driesprong.frl> wrote: >I feel we're going a bit off topic here. Although it is good to discuss >the >possibilities. > >From my perspective the AIP tries to kill two birds with one stone: > > 1. Decoupling the web-server from the actual Python DAG files > 2. Versioning the DAGs so we can have a historical view of the dags as >they are executed. For example, if you now deploy a new dag, and you >rename >an operator, the old name will disappear from the tree view, and you >will > get a new row which has no status (until you trigger a run). > >They are related but require need different solutions. Personally, I'm >most >interested in the first so we can make the webserver (finally) >stateless. >The PR tries to remove the need for having the actual python for the >different views. Some of them are trivial, such as the graph and tree >view. >Some of them are more tricky, such as the task instance details and the >rendered view because they pull a lot of information from the DAG >object, >and it involves jinja templating. >The main goal of the PR is to store this kind of metadata when the >scheduler kicks off the dag. So when a new DagRun is being created, the >latest version of the dag is loaded from the actual python file on disk >by >the scheduler. The file is executed and the DAG is being persisted into >the >database in a structured way. By moving this into the database, we can >eventually decouple the webserver from the actual dag files, which >greatly >simplifies deployment and removes the state since this is now in the >(ACID >compliant) database. Furthermore, instead of whitelisting certain >classes, >we control the serialization ourselves by knowing what we push to the >database instead of having to push a pickled blob. Besides that, from a >performance perspective, as Max already pointed out, pickling can >potentially be expensive in terms of memory and CPU. > >> Since it's pretty clear we need SimpleDAG serialization, and we can >see >> through the requirements, people can pretty much get started on this. > >I'm afraid if we go this route, then the SimpleDag will be the actual >Dag >in the end (but then a slightly smaller and serializable version of >it). My >preference would be to simplify the DAG object and get rid of the >BaseDag >and SimpleDag to simplify the object hierarchy. > >Cheers, Fokko > >Op wo 27 feb. 2019 om 21:23 schreef Maxime Beauchemin < >maximebeauche...@gmail.com>: > >> I fully agree on all your points Dan. >> >> First about PEX vs Docker, Docker is a clear choice here. It's a >superset >> of what PEX can do (PEX is limited to python env) and a great >standard that >> has awesome tooling around it, works natively in k8s, which is >becoming the >> preferred executor. PEX is a bit of a hack and has failed to become a >> standard. I don't think anyone would argue for PEX over Docker in >almost >> any context where this question would show up nowadays (beyond >Airflow >> too). >> >> And pickles have a lot of disadvantages over docker: >> * some objects are not picklable (JinjaTemplates!) >> * lack of visibility / tooling on building them, you might make one >call >> and get a 500mb pickle, and have not clue why it's so big >> * unlike docker, they are impossible to introspect (as far as I >know), you >> have to intercept the __deepcopy__ method, good luck! >> * pickles require a very similar environment to be rehydrated (same >GCC >> version, same modules/version available?) >> * weird side effects and unknown boundaries. If I pickled a DAG on an >older >> version of airflow and code logic got included, and restore it on a >newer, >> could the new host be oddly downgraded as a result? when restored, >did it >> affect the whole environment / other DAGs? >> >> My vote goes towards something like SimpleDAG serialization (for web >server >> and similar use cases) + docker images built with top of a >lightweight SDK >> as the way to go. >> >> Since it's pretty clear we need SimpleDAG serialization, and we can >see >> through the requirements, people can pretty much get started on this. >> >> The question of how to bring native Docker support to Airflow is a >bit more >> complex. I think the k8s executor has support for custom DAG >containers but >> idk how it works as I haven't look deeply into this recently. Docker >> support in Airflow is a tricky and important question, and it offers >an >> opportunity to provide solutions to long standing issues around >versioning, >> test/dev user workflows, and more. >> >> Related docker questions: >> * what's the docker interface contract? what entry point have to >exist in >> the image? >> * does airflow provide tooling to help bake images that enforce that >> contract? >> * do we need docker tag semantics in the DSL? does it look something >like >> `DAG(id='mydag', docker_tag=hub.docker.com://org/repo/tag', ...)` >> * is docker optional or required? (probably optional at first) >> * should docker support be k8s-executor centric? work the same across >> executor? are we running docker-in-docker as a result / penalty in >k8s? >> >> Max >> >> On Wed, Feb 27, 2019 at 11:38 AM Dan Davydov ><ddavy...@twitter.com.invalid >> > >> wrote: >> >> > > >> > > * on the topic of serialization, let's be clear whether we're >talking >> > about >> > > unidirectional serialization and *not* deserialization back to >the >> > object. >> > > This works for making the web server stateless, but isn't a >solution >> > around >> > > how DAG definition get shipped around on the cluster (which would >be >> nice >> > > to have from a system standpoint, but we'd have to break lots of >> dynamic >> > > features, things like callbacks and attaching complex objects to >DAGs, >> > ...) >> > >> > I feel these dynamic features are not worth the tradeoffs, and in >most >> > cases have alternatives, e.g. on_failure_callback can be replaced >by a >> task >> > with a ONE_FAILURE trigger rule, which gives additional advantages >that >> > first-class Airflow tasks have like retries. That being said, we >should >> > definitely do our due diligence weighing the trade-offs and coming >up >> with >> > alternatives for any feature we disable (jinja templating related >to >> > webserver rendering, callbacks, etc). I remember speaking to Alex >about >> > this and he agreed that the consistency/auditing/isolation >guarantees >> were >> > worth losing some features, I think Paul did as well. Certainly we >will >> > need to have a discussion/vote with the rest of the committers. >> > >> > My initial thinking is that both the DAG topology serialization >(i.e. >> > generating and storing SimpleDag in the DB for each DAG), and >linking >> each >> > DAG with a pex/docker image/etc as well as authentication tokens >should >> > happen at the same place, probably the client runs some command >that will >> > generate SimpleDag as well as a container, and then sends it to >some >> > Airflow Service that stores all of this information appropriately. >Then >> > Scheduler/Webserver/Worker consume the stored SimpleDAgs, and >Workers >> > consume the containers in addition. >> > >> > * docker as "serialization" is interesting, I looked into "pex" >format in >> > > the past. It's pretty cool to think of DAGs as micro docker >application >> > > that get shipped around and executed. The challenge with this is >that >> it >> > > makes it hard to control Airflow's core. Upgrading Airflow >becomes >> [also] >> > > about upgrading the DAG docker images. We had similar concerns >with >> > "pex". >> > > The data platform team looses their handle on the core, or has to >get >> in >> > > the docker building business, which is atypical. For an upgrade, >you'd >> > have >> > > to ask/force the people who own the DAG dockers to upgrade their >> images, >> > >> > The container vs Airflow versioning problem I believe is just an >API >> > versioning problem. I.e. you don't necessarily have to rebuild all >> > containers when you bump version of airflow as long as the API is >> backwards >> > compatible). I think this is reasonable for a platform like >Airflow, and >> > not sure there is a great way to avoid it if we want other nice >system >> > guarantees (e.g. reproducibility). >> > >> > Contract could be like "we'll only run >> > > your Airflow-docker-dag container if it's in a certain version >range" >> or >> > > something like that. I think it's a cool idea. It gets intricate >for >> the >> > > stateless web server though, it's a bit of a mind bender :) You >could >> ask >> > > the docker to render the page (isn't that crazy?!) or ask the >docker >> for >> > a >> > > serialized version of the DAG that allows you to render the page >> (similar >> > > to point 1). >> > >> > If the webserver uses the SimpleDag representation that is >generated at >> the >> > time of DAG creation, then you can avoid having Docker needing to >provide >> > this serialized version, i.e. you push the responsibility to the >client >> to >> > have the right dependencies in order to build the DAG which I feel >is >> good. >> > One tricky thing I can think of is if you have special UI elements >> related >> > to the operator type of a task (I saw a PR out for this recently), >you >> > would need to solve the API versioning problem separately for this >as >> well >> > (i.e. make sure the serialized DAG representation works with the >version >> of >> > the newest operator UI). >> > >> > * About storing in the db, for efficiency, the pk should be the SHA >of >> the >> > > deterministic serialized DAG. Only store a new entry if the DAG >has >> > > changed, and stamp the DagRun to a FK of that serialized DAG >table. If >> > > people have shapeshifting DAG within DagRuns we just do best >effort, >> show >> > > them the last one or something like that >> > >> > If we link each dagrun to it's "container" and "serialized >> representation" >> > then the web UI can actually iterate through each dagrun and even >render >> > changes in topology. I think at least for v1 we can just use the >current >> > solution as you mentioned (best effort using the latest version). >> > >> > * everyone hates pickles (including me), but it really almost >works, >> might >> > > be worth revisiting, or at least I think it's good for me to list >out >> the >> > > blockers: >> > > * JinjaTemplate objects are not serializable for some odd >obscure >> > > reason, I think the community can solve that easily, if someone >wants a >> > > full brain dump on this I can share what I know >> > >> > What was the preference for using Pickle over Docker/PEX for >> serialization? >> > I think we discussed this a long time ago with Paul but I forget >the >> > rationale and it would be good to have the information shared >publicly >> too. >> > One big problem is you don't get isolation at the binary dependency >> level, >> > i.e. .so/.dll dependencies, along with all of the other problems >you >> > listed. >> > >> > On Tue, Feb 26, 2019 at 8:55 PM Maxime Beauchemin < >> > maximebeauche...@gmail.com> wrote: >> > >> > > Related thoughts: >> > > >> > > * on the topic of serialization, let's be clear whether we're >talking >> > about >> > > unidirectional serialization and *not* deserialization back to >the >> > object. >> > > This works for making the web server stateless, but isn't a >solution >> > around >> > > how DAG definition get shipped around on the cluster (which would >be >> nice >> > > to have from a system standpoint, but we'd have to break lots of >> dynamic >> > > features, things like callbacks and attaching complex objects to >DAGs, >> > ...) >> > > >> > > * docker as "serialization" is interesting, I looked into "pex" >format >> in >> > > the past. It's pretty cool to think of DAGs as micro docker >application >> > > that get shipped around and executed. The challenge with this is >that >> it >> > > makes it hard to control Airflow's core. Upgrading Airflow >becomes >> [also] >> > > about upgrading the DAG docker images. We had similar concerns >with >> > "pex". >> > > The data platform team looses their handle on the core, or has to >get >> in >> > > the docker building business, which is atypical. For an upgrade, >you'd >> > have >> > > to ask/force the people who own the DAG dockers to upgrade their >> images, >> > > else they won't run or something. Contract could be like "we'll >only >> run >> > > your Airflow-docker-dag container if it's in a certain version >range" >> or >> > > something like that. I think it's a cool idea. It gets intricate >for >> the >> > > stateless web server though, it's a bit of a mind bender :) You >could >> ask >> > > the docker to render the page (isn't that crazy?!) or ask the >docker >> for >> > a >> > > serialized version of the DAG that allows you to render the page >> (similar >> > > to point 1). >> > > >> > > * About storing in the db, for efficiency, the pk should be the >SHA of >> > the >> > > deterministic serialized DAG. Only store a new entry if the DAG >has >> > > changed, and stamp the DagRun to a FK of that serialized DAG >table. If >> > > people have shapeshifting DAG within DagRuns we just do best >effort, >> show >> > > them the last one or something like that >> > > >> > > * everyone hates pickles (including me), but it really almost >works, >> > might >> > > be worth revisiting, or at least I think it's good for me to list >out >> the >> > > blockers: >> > > * JinjaTemplate objects are not serializable for some odd >obscure >> > > reason, I think the community can solve that easily, if someone >wants a >> > > full brain dump on this I can share what I know >> > > * Size: as you pickle something, someone might have attached >things >> > > that recurse into hundreds of GBs-size pickle. Like some >> > > on_failure_callback may bring in the whole Slack api library. >That can >> be >> > > solved or mitigated in different ways. At some point I thought >I'd >> have a >> > > DAG.validate() method that makes sure that the DAG can be >pickled, and >> > > serialized to a reasonable size pickle. I also think we'd have to >make >> > sure >> > > operators are defined as more "abstract" otherwise the pickle >includes >> > > things like the whole pyhive lib and all sorts of other deps. It >could >> be >> > > possible to limit what gets attached to the pickle (whitelist >classes), >> > and >> > > dehydrate objects during serialization / and rehydrate them on >the >> other >> > > size (assuming classes are on the worker too). If that sounds >crazy to >> > you, >> > > it's because it is. >> > > >> > > * the other crazy idea is thinking of git repo (the code itself) >as the >> > > serialized DAG. There are git filesystem in userspace [fuse] that >allow >> > > dynamically accessing the git history like it's just a folder, as >in >> > > `REPO/{ANY_GIT_REF}/dags/mydag.py` . Beautifully hacky. A company >with >> a >> > > blue logo with a big F on it that I used to work at did that. >Talking >> > about >> > > embracing config-as-code! The DagRun can just stamp the git SHA >it's >> > > running with. >> > > >> > > Sorry about the confusion, config as code gets tricky around the >> corners. >> > > But it's all worth it, right? Right!? :) >> > > >> > > On Tue, Feb 26, 2019 at 3:09 AM Kevin Yang <yrql...@gmail.com> >wrote: >> > > >> > > > My bad, I was misunderstanding a bit and mixing up two issues. >I was >> > > > thinking about the multiple runs for one DagRun issue( e.g. >after we >> > > clear >> > > > the DagRun). >> > > > >> > > > This is an orthogonal issue. So the current implementation can >work >> in >> > > the >> > > > long term plan. >> > > > >> > > > Cheers, >> > > > Kevin Y >> > > > >> > > > On Tue, Feb 26, 2019 at 2:34 AM Ash Berlin-Taylor ><a...@apache.org> >> > > wrote: >> > > > >> > > > > >> > > > > > On 26 Feb 2019, at 09:37, Kevin Yang <yrql...@gmail.com> >wrote: >> > > > > > >> > > > > > Now since we're already trying to have multiple graphs for >one >> > > > > > execution_date, maybe we should just have multiple DagRun. >> > > > > >> > > > > I thought that there is exactly 1 graph for a DAG run - >dag_run >> has a >> > > > > "graph_id" column >> > > > >> > > >> > >>