Just my two cents.

I agree with Fokko that the discussion is about way bigger topic than just
serialising DAGs. But I think it might be a very good direction
nevertheless as it potentially addresses several other AIPs (I believe at
least three other AIPs can benefit from better container support for
Airflow: AIP-7 AIP-8, AIP-10).

Maybe it should be a different discussion and potential direction even
beyond 2.0?
Or maybe it's worth to think about better Containerisation approach for
Airflow as a foundation for several other improvements and worth doing for
2.0.

I personally believe using Docker as "serialisation" for DAGs + all
dependencies (including binaries and execution environment) is great idea
that has far reaching consequences.

Especially if we have a Docker registry as an intermediary (either external
or local) and introduced multi-layered Docker image (AIP-10
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-10+Multi-layered+official+Airflow+image>
- where
I discussed performance benefits), it might be quite powerful way of moving
Airflow into the container world - simplifying Airflow architecture and
Deployment complexity as result.

*How this might work:*

I imagine task binary distribution to be completely done via Docker
registry (an image build by airflow scheduler or web server and pushed to
registry). Airflow scheduler/webserver having its own base docker image
with all the binaries and airflow's source code could use it to build a new
image on top of it - adding the needed DAG whenever a task is scheduled.
There is nothing wrong with having even thousands of docker images if they
have common layers and thin "few files" DAGs on top. Code to build such
image could be part of airflow itself (no manual/devops packaging needed).
This should be super-fast - just adding a few files to existing image is
pretty instant. Then pushing such layer to registry is also fast, similarly
worker that already run in the past any airflow task could pull such image
very quickly. It will only pull the extra DAG layer because the airflow
layer is already pulled. But in effect - the worker can have the whole
image ready to run - with all the dependencies and DAG that can be run
immediately without any extra actions.

*Migrations:*

Such approach can also support migrations efficiently (with multiple layers
it will be super-fast to migrate between minor version where only Airflow
sources change and few extra dependencies are added). Each docker image has
unique SHA256 and it can be used as label to identify, so I even imagine
workers running different images based on different airflow version images
at the same time. In this approach - all dependencies (basically whole
execution environment) are in the image itself. There is no problem to have
even live migration (some tasks still running using older images but some
other already using new ones). This is what for example Istio on top of
Kubernetes can use to perform A/B testing or staged rollout (5% of traffic
, then 10% etc.).

*Better Kubernetes support:*

Which brings us to Kubernetes - going that route could vastly simplify the
current Kubernetes executor. Because this is what actually Kubernetes
executor does - it runs a Pod based on certain image(s). Currently
Kubernetes image also requires DAGs to be distributed across cluster (using
shared volumes or git-sync), but if if we bake the DAGs into the Airflow
image itself, then Kubernetes executor would simply boil down to preparing
a POD yaml file with the right image link (SHA of the right image
containing the DAG to run) and submitting it for execution, no extra shared
volumes needed to distribute DAGs across cluster.

*Celery support:*

You do not have to use Kubernetes at all - Celery worker or LocalExecutor
can easily create and start container based on a docker image stored in a
registry (local or remote) - there is nothing difficult or strange with
that. No Kubernetes knowledge needed just basic docker/container knowledge
needed (and not even that as it will all be handled transparently by
Airflow).

*Better development workflow:*

It might also be great for development support - with multi-layered Docker
you could efficiently build locally your image and map local volume with
DAGs in the same place it will be baked-in. Thus when you start the image
locally you get the same experience as if the DAGs were baked-in But
mounted from local development sources - thus you could edit them in-place
and get immediately reflected in your Docker. It makes a super-fast and
efficient development workflow where you only keep sources on your host
machine and everything executes in Docker. It goes hand-in-hand with
"Simplified Development Workflow" - (AIP-7
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-7+Simplified+development+workflow>
).

*Easier Airflow modularisation:*

One last thing (even further fetching) - It could make "Split
Hooks/Operators into Separate Packages"  (AIP-8
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=100827303>)
much easier to implement - there is a big problem with dependencies if you
split Airflow into independently developed modules, but if we can come up
with a way how different operators can be run in different images (via
docker compose or inside the same Kubernetes Pod), that might become much
more feasible to implement and maintain.

J.

On Wed, Feb 27, 2019 at 10:45 PM Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> Agreed, I wasn't trying to inflate the scope of the AIP, just raising
> related topics to see how it all fits together.
>
> Max
>
> On Wed, Feb 27, 2019 at 1:17 PM Bolke de Bruin <bdbr...@gmail.com> wrote:
>
> > I agree with Fokko here. We have been discussing serialisation for a
> > veerrryyy long time and nothing has come of it :-). Probably because we
> are
> > making it too big.
> >
> > As Fokko states, there are several goals and each of them probably
> warrants
> > an AIP:
> >
> > 1) Make the webserver stateless: needs the graph of the *current* dag
> > 2) Version dags: for consistency mainly and not requiring parsing of the
> > dag on every loop
> > 3) Make the scheduler not require DAG files. This could be done if the
> > edges contain all information when to trigger the next task. We can then
> > have event driven dag parsing outside of the scheduler loop, ie. by the
> > cli. Storage can also be somewhere else (git, artifactory, filesystem,
> > whatever).
> > 4) Fully serialise the dag so it becomes transferable to workers
> >
> > 1-3 are related, 4 isn’t.
> >
> > It would be awesome if we could tackle 1 first, which I think the PR is
> > doing as a first iteration. It would takes us a long way to the others.
> >
> > B.
> >
> >
> > On 27 February 2019 at 22:00:59, Driesprong, Fokko (fo...@driesprong.frl
> )
> > wrote:
> >
> > I feel we're going a bit off topic here. Although it is good to discuss
> the
> > possibilities.
> >
> > From my perspective the AIP tries to kill two birds with one stone:
> >
> > 1. Decoupling the web-server from the actual Python DAG files
> > 2. Versioning the DAGs so we can have a historical view of the dags as
> > they are executed. For example, if you now deploy a new dag, and you
> rename
> > an operator, the old name will disappear from the tree view, and you will
> > get a new row which has no status (until you trigger a run).
> >
> > They are related but require need different solutions. Personally, I'm
> most
> > interested in the first so we can make the webserver (finally) stateless.
> > The PR tries to remove the need for having the actual python for the
> > different views. Some of them are trivial, such as the graph and tree
> view.
> > Some of them are more tricky, such as the task instance details and the
> > rendered view because they pull a lot of information from the DAG object,
> > and it involves jinja templating.
> > The main goal of the PR is to store this kind of metadata when the
> > scheduler kicks off the dag. So when a new DagRun is being created, the
> > latest version of the dag is loaded from the actual python file on disk
> by
> > the scheduler. The file is executed and the DAG is being persisted into
> the
> > database in a structured way. By moving this into the database, we can
> > eventually decouple the webserver from the actual dag files, which
> greatly
> > simplifies deployment and removes the state since this is now in the
> (ACID
> > compliant) database. Furthermore, instead of whitelisting certain
> classes,
> > we control the serialization ourselves by knowing what we push to the
> > database instead of having to push a pickled blob. Besides that, from a
> > performance perspective, as Max already pointed out, pickling can
> > potentially be expensive in terms of memory and CPU.
> >
> > > Since it's pretty clear we need SimpleDAG serialization, and we can see
> > > through the requirements, people can pretty much get started on this.
> >
> > I'm afraid if we go this route, then the SimpleDag will be the actual Dag
> > in the end (but then a slightly smaller and serializable version of it).
> My
> > preference would be to simplify the DAG object and get rid of the BaseDag
> > and SimpleDag to simplify the object hierarchy.
> >
> > Cheers, Fokko
> >
> > Op wo 27 feb. 2019 om 21:23 schreef Maxime Beauchemin <
> > maximebeauche...@gmail.com>:
> >
> > > I fully agree on all your points Dan.
> > >
> > > First about PEX vs Docker, Docker is a clear choice here. It's a
> superset
> > > of what PEX can do (PEX is limited to python env) and a great standard
> > that
> > > has awesome tooling around it, works natively in k8s, which is becoming
> > the
> > > preferred executor. PEX is a bit of a hack and has failed to become a
> > > standard. I don't think anyone would argue for PEX over Docker in
> almost
> > > any context where this question would show up nowadays (beyond Airflow
> > > too).
> > >
> > > And pickles have a lot of disadvantages over docker:
> > > * some objects are not picklable (JinjaTemplates!)
> > > * lack of visibility / tooling on building them, you might make one
> call
> > > and get a 500mb pickle, and have not clue why it's so big
> > > * unlike docker, they are impossible to introspect (as far as I know),
> > you
> > > have to intercept the __deepcopy__ method, good luck!
> > > * pickles require a very similar environment to be rehydrated (same GCC
> > > version, same modules/version available?)
> > > * weird side effects and unknown boundaries. If I pickled a DAG on an
> > older
> > > version of airflow and code logic got included, and restore it on a
> > newer,
> > > could the new host be oddly downgraded as a result? when restored, did
> it
> > > affect the whole environment / other DAGs?
> > >
> > > My vote goes towards something like SimpleDAG serialization (for web
> > server
> > > and similar use cases) + docker images built with top of a lightweight
> > SDK
> > > as the way to go.
> > >
> > > Since it's pretty clear we need SimpleDAG serialization, and we can see
> > > through the requirements, people can pretty much get started on this.
> > >
> > > The question of how to bring native Docker support to Airflow is a bit
> > more
> > > complex. I think the k8s executor has support for custom DAG containers
> > but
> > > idk how it works as I haven't look deeply into this recently. Docker
> > > support in Airflow is a tricky and important question, and it offers an
> > > opportunity to provide solutions to long standing issues around
> > versioning,
> > > test/dev user workflows, and more.
> > >
> > > Related docker questions:
> > > * what's the docker interface contract? what entry point have to exist
> in
> > > the image?
> > > * does airflow provide tooling to help bake images that enforce that
> > > contract?
> > > * do we need docker tag semantics in the DSL? does it look something
> like
> > > `DAG(id='mydag', docker_tag=hub.docker.com://org/repo/tag', ...)`
> > > * is docker optional or required? (probably optional at first)
> > > * should docker support be k8s-executor centric? work the same across
> > > executor? are we running docker-in-docker as a result / penalty in k8s?
> > >
> > > Max
> > >
> > > On Wed, Feb 27, 2019 at 11:38 AM Dan Davydov
> > <ddavy...@twitter.com.invalid
> > > >
> > > wrote:
> > >
> > > > >
> > > > > * on the topic of serialization, let's be clear whether we're
> talking
> > > > about
> > > > > unidirectional serialization and *not* deserialization back to the
> > > > object.
> > > > > This works for making the web server stateless, but isn't a
> solution
> > > > around
> > > > > how DAG definition get shipped around on the cluster (which would
> be
> > > nice
> > > > > to have from a system standpoint, but we'd have to break lots of
> > > dynamic
> > > > > features, things like callbacks and attaching complex objects to
> > DAGs,
> > > > ...)
> > > >
> > > > I feel these dynamic features are not worth the tradeoffs, and in
> most
> > > > cases have alternatives, e.g. on_failure_callback can be replaced by
> a
> > > task
> > > > with a ONE_FAILURE trigger rule, which gives additional advantages
> that
> > > > first-class Airflow tasks have like retries. That being said, we
> should
> > > > definitely do our due diligence weighing the trade-offs and coming up
> > > with
> > > > alternatives for any feature we disable (jinja templating related to
> > > > webserver rendering, callbacks, etc). I remember speaking to Alex
> about
> > > > this and he agreed that the consistency/auditing/isolation guarantees
> > > were
> > > > worth losing some features, I think Paul did as well. Certainly we
> will
> > > > need to have a discussion/vote with the rest of the committers.
> > > >
> > > > My initial thinking is that both the DAG topology serialization (i.e.
> > > > generating and storing SimpleDag in the DB for each DAG), and linking
> > > each
> > > > DAG with a pex/docker image/etc as well as authentication tokens
> should
> > > > happen at the same place, probably the client runs some command that
> > will
> > > > generate SimpleDag as well as a container, and then sends it to some
> > > > Airflow Service that stores all of this information appropriately.
> Then
> > > > Scheduler/Webserver/Worker consume the stored SimpleDAgs, and Workers
> > > > consume the containers in addition.
> > > >
> > > > * docker as "serialization" is interesting, I looked into "pex"
> format
> > in
> > > > > the past. It's pretty cool to think of DAGs as micro docker
> > application
> > > > > that get shipped around and executed. The challenge with this is
> that
> > > it
> > > > > makes it hard to control Airflow's core. Upgrading Airflow becomes
> > > [also]
> > > > > about upgrading the DAG docker images. We had similar concerns with
> > > > "pex".
> > > > > The data platform team looses their handle on the core, or has to
> get
> > > in
> > > > > the docker building business, which is atypical. For an upgrade,
> > you'd
> > > > have
> > > > > to ask/force the people who own the DAG dockers to upgrade their
> > > images,
> > > >
> > > > The container vs Airflow versioning problem I believe is just an API
> > > > versioning problem. I.e. you don't necessarily have to rebuild all
> > > > containers when you bump version of airflow as long as the API is
> > > backwards
> > > > compatible). I think this is reasonable for a platform like Airflow,
> > and
> > > > not sure there is a great way to avoid it if we want other nice
> system
> > > > guarantees (e.g. reproducibility).
> > > >
> > > > Contract could be like "we'll only run
> > > > > your Airflow-docker-dag container if it's in a certain version
> range"
> > > or
> > > > > something like that. I think it's a cool idea. It gets intricate
> for
> > > the
> > > > > stateless web server though, it's a bit of a mind bender :) You
> could
> > > ask
> > > > > the docker to render the page (isn't that crazy?!) or ask the
> docker
> > > for
> > > > a
> > > > > serialized version of the DAG that allows you to render the page
> > > (similar
> > > > > to point 1).
> > > >
> > > > If the webserver uses the SimpleDag representation that is generated
> at
> > > the
> > > > time of DAG creation, then you can avoid having Docker needing to
> > provide
> > > > this serialized version, i.e. you push the responsibility to the
> client
> > > to
> > > > have the right dependencies in order to build the DAG which I feel is
> > > good.
> > > > One tricky thing I can think of is if you have special UI elements
> > > related
> > > > to the operator type of a task (I saw a PR out for this recently),
> you
> > > > would need to solve the API versioning problem separately for this as
> > > well
> > > > (i.e. make sure the serialized DAG representation works with the
> > version
> > > of
> > > > the newest operator UI).
> > > >
> > > > * About storing in the db, for efficiency, the pk should be the SHA
> of
> > > the
> > > > > deterministic serialized DAG. Only store a new entry if the DAG has
> > > > > changed, and stamp the DagRun to a FK of that serialized DAG table.
> > If
> > > > > people have shapeshifting DAG within DagRuns we just do best
> effort,
> > > show
> > > > > them the last one or something like that
> > > >
> > > > If we link each dagrun to it's "container" and "serialized
> > > representation"
> > > > then the web UI can actually iterate through each dagrun and even
> > render
> > > > changes in topology. I think at least for v1 we can just use the
> > current
> > > > solution as you mentioned (best effort using the latest version).
> > > >
> > > > * everyone hates pickles (including me), but it really almost works,
> > > might
> > > > > be worth revisiting, or at least I think it's good for me to list
> out
> > > the
> > > > > blockers:
> > > > > * JinjaTemplate objects are not serializable for some odd obscure
> > > > > reason, I think the community can solve that easily, if someone
> wants
> > a
> > > > > full brain dump on this I can share what I know
> > > >
> > > > What was the preference for using Pickle over Docker/PEX for
> > > serialization?
> > > > I think we discussed this a long time ago with Paul but I forget the
> > > > rationale and it would be good to have the information shared
> publicly
> > > too.
> > > > One big problem is you don't get isolation at the binary dependency
> > > level,
> > > > i.e. .so/.dll dependencies, along with all of the other problems you
> > > > listed.
> > > >
> > > > On Tue, Feb 26, 2019 at 8:55 PM Maxime Beauchemin <
> > > > maximebeauche...@gmail.com> wrote:
> > > >
> > > > > Related thoughts:
> > > > >
> > > > > * on the topic of serialization, let's be clear whether we're
> talking
> > > > about
> > > > > unidirectional serialization and *not* deserialization back to the
> > > > object.
> > > > > This works for making the web server stateless, but isn't a
> solution
> > > > around
> > > > > how DAG definition get shipped around on the cluster (which would
> be
> > > nice
> > > > > to have from a system standpoint, but we'd have to break lots of
> > > dynamic
> > > > > features, things like callbacks and attaching complex objects to
> > DAGs,
> > > > ...)
> > > > >
> > > > > * docker as "serialization" is interesting, I looked into "pex"
> > format
> > > in
> > > > > the past. It's pretty cool to think of DAGs as micro docker
> > application
> > > > > that get shipped around and executed. The challenge with this is
> that
> > > it
> > > > > makes it hard to control Airflow's core. Upgrading Airflow becomes
> > > [also]
> > > > > about upgrading the DAG docker images. We had similar concerns with
> > > > "pex".
> > > > > The data platform team looses their handle on the core, or has to
> get
> > > in
> > > > > the docker building business, which is atypical. For an upgrade,
> > you'd
> > > > have
> > > > > to ask/force the people who own the DAG dockers to upgrade their
> > > images,
> > > > > else they won't run or something. Contract could be like "we'll
> only
> > > run
> > > > > your Airflow-docker-dag container if it's in a certain version
> range"
> > > or
> > > > > something like that. I think it's a cool idea. It gets intricate
> for
> > > the
> > > > > stateless web server though, it's a bit of a mind bender :) You
> could
> > > ask
> > > > > the docker to render the page (isn't that crazy?!) or ask the
> docker
> > > for
> > > > a
> > > > > serialized version of the DAG that allows you to render the page
> > > (similar
> > > > > to point 1).
> > > > >
> > > > > * About storing in the db, for efficiency, the pk should be the SHA
> > of
> > > > the
> > > > > deterministic serialized DAG. Only store a new entry if the DAG has
> > > > > changed, and stamp the DagRun to a FK of that serialized DAG table.
> > If
> > > > > people have shapeshifting DAG within DagRuns we just do best
> effort,
> > > show
> > > > > them the last one or something like that
> > > > >
> > > > > * everyone hates pickles (including me), but it really almost
> works,
> > > > might
> > > > > be worth revisiting, or at least I think it's good for me to list
> out
> > > the
> > > > > blockers:
> > > > > * JinjaTemplate objects are not serializable for some odd obscure
> > > > > reason, I think the community can solve that easily, if someone
> wants
> > a
> > > > > full brain dump on this I can share what I know
> > > > > * Size: as you pickle something, someone might have attached things
> > > > > that recurse into hundreds of GBs-size pickle. Like some
> > > > > on_failure_callback may bring in the whole Slack api library. That
> > can
> > > be
> > > > > solved or mitigated in different ways. At some point I thought I'd
> > > have a
> > > > > DAG.validate() method that makes sure that the DAG can be pickled,
> > and
> > > > > serialized to a reasonable size pickle. I also think we'd have to
> > make
> > > > sure
> > > > > operators are defined as more "abstract" otherwise the pickle
> > includes
> > > > > things like the whole pyhive lib and all sorts of other deps. It
> > could
> > > be
> > > > > possible to limit what gets attached to the pickle (whitelist
> > classes),
> > > > and
> > > > > dehydrate objects during serialization / and rehydrate them on the
> > > other
> > > > > size (assuming classes are on the worker too). If that sounds crazy
> > to
> > > > you,
> > > > > it's because it is.
> > > > >
> > > > > * the other crazy idea is thinking of git repo (the code itself) as
> > the
> > > > > serialized DAG. There are git filesystem in userspace [fuse] that
> > allow
> > > > > dynamically accessing the git history like it's just a folder, as
> in
> > > > > `REPO/{ANY_GIT_REF}/dags/mydag.py` . Beautifully hacky. A company
> > with
> > > a
> > > > > blue logo with a big F on it that I used to work at did that.
> Talking
> > > > about
> > > > > embracing config-as-code! The DagRun can just stamp the git SHA
> it's
> > > > > running with.
> > > > >
> > > > > Sorry about the confusion, config as code gets tricky around the
> > > corners.
> > > > > But it's all worth it, right? Right!? :)
> > > > >
> > > > > On Tue, Feb 26, 2019 at 3:09 AM Kevin Yang <yrql...@gmail.com>
> > wrote:
> > > > >
> > > > > > My bad, I was misunderstanding a bit and mixing up two issues. I
> > was
> > > > > > thinking about the multiple runs for one DagRun issue( e.g. after
> > we
> > > > > clear
> > > > > > the DagRun).
> > > > > >
> > > > > > This is an orthogonal issue. So the current implementation can
> work
> > > in
> > > > > the
> > > > > > long term plan.
> > > > > >
> > > > > > Cheers,
> > > > > > Kevin Y
> > > > > >
> > > > > > On Tue, Feb 26, 2019 at 2:34 AM Ash Berlin-Taylor <
> a...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > >
> > > > > > > > On 26 Feb 2019, at 09:37, Kevin Yang <yrql...@gmail.com>
> > wrote:
> > > > > > > >
> > > > > > > > Now since we're already trying to have multiple graphs for
> one
> > > > > > > > execution_date, maybe we should just have multiple DagRun.
> > > > > > >
> > > > > > > I thought that there is exactly 1 graph for a DAG run - dag_run
> > > has a
> > > > > > > "graph_id" column
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
E: jarek.pot...@polidea.com

Reply via email to