Quick comment (as I'm still mostly on paternity leave): Storing wheels in the db sounds like a bad Idea to me, especially if we need to store deps in there too (and if we don't store deps, then they are incomplete) - they could get very large, and I've stored blobs of ~10mb in postgres before: I don't recommend it. It "works" but operating it is tricky.
> the API could simply accept "Wheel file + the Dag id" This sounds like a huge security risk. My main concern with this idea is that it seems a lot of complexity we are putting on users. Doubly so if they are already using docker where there already exists an Ideal packaging and distribution that could contain dag + needed code. (Sorry for the brevity) -ash On 2 August 2020 08:47:39 BST, Jarek Potiuk <jarek.pot...@polidea.com> wrote: >Few points from my sid (and proposal!): > >1) Agree with Max - with a rather strong NO for pickles (however, >indeed cloudpickle solves some of the problems). Pickles came up in >our discussion in Polidea recently and the overall message was "no". I >agree with Max here - if we can ship python code, turning that into >pickle for transit makes little sense to me and brings a plethora of >problems. > >2) I think indeed the versioning solution should treat the "DagRun" >structure atomically. While I see why we would like to go with the >UI/Scheduler only first rather than implementing them in the workers, >adding the "mixed version" is where it breaks down IMHO. Reasoning >about such "mixed version" dag is next to impossible. The current >behavior is not well defined and non-deterministic (depends on >scheduler delays, syncing, type of deployment, restarts of the works >etc.) we are moving it up to UI (thus users) rather than solving the >problem. So I am not a big fan of this and would rather solve it >"well" with atomicity. > >3) I see the point of Dan as well - we had many discussions and many >times the idea about "submitting" the DAG for execution via the API >came up - and it makes sense IMHO. > >Proposal: Implement full versioning with code shipping via DB wheels >BLOB (akin to serialized DAGs). > >I understand that the big issue is how to actually "ship" the code to >the worker. And - maybe a wild idea - we can kill several birds with >the same stone. > >There were plenty of discussions on how we could do that but one was >never truly explored - using wheel packages. > >For those who do not know them, there is the PEP: >https://www.python.org/dev/peps/pep-0427/ > >Wheels allow to "package" python code in a standard way. They are >portable ("purelib" + contain .py rather than .pyc code), they have >metadata, versioning information, they can be signed for security, >They can contain other packages or python code, Why don't we let >scheduler to pack the fingerprinted version of the DAG in a .whl and >store it as a blob in a DB next to the serialized form? > >There were concerns about the size of the code to keep in the DB - but >we already use the DB for serialized DAGs and it works fine (I believe >we only need to add compressing of the JSon serialized form - as we've >learned from AirBnb during their talk at the Airflow Summit - wheels >are already compressed). Also - each task will only need the >particular "version" of one DAG so even if we keep many of them in the >DB, the old version will pretty soon go "cold" and will never be >retrieved (and most DBs will handle it well with caching/indexes). > >And if we want to add "callables" from other files - there is nothing >to stop the person who defines dag to add list of files that should be >packaged together with the main DAG file (additional_python_files = >["common/my_fantastic_library.py"] in DAG constructor). Or we could >auto-add all files after the DAG gets imported (i.e. package >automatically all files that are imported for that particular DAG from >the "dags" folder"). That should be rather easy. > >This way we could ship the code to workers for the exact version that >the DagRun uses. And they can be cached and unpacked/installed to a >virtualenv for the execution of that single task. That should be super >quick. Such virtualenv can be wiped out after execution. > >Then we got what Max wants (atomicity of DagRuns) and what Dan wants >(the API could simply accept "Wheel file + the Dag id". We have the >isolation between tasks running on the same worker (based on >virtualenv) so that each process in the same worker can run a >different version of the same Dag. We have much less confusion for the >UI. > >Extra bonus 1: we can expand it to package different dependencies in >the wheels as well - so that if an operator requires a different >(newer) version of a python library, it could be packaged together >with the DAG in the same .whl file. This is also a highly requested >feature. >Extra bonus 2: workers will stop depending on the DAG file mount (!) >which was our long term goal and indeed as Dan mentioned - a great >step towards multi-tenancy. > >J. > > > > > > >On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin ><maximebeauche...@gmail.com> wrote: >> >> Having tried it early on, I'd advocate pretty strongly against >pickles and >> would rather not get too deep into the why here. Short story is they >can >> pull the entire memory space or much more than you want, and it's >> impossible to reason about where they end. For that reason and other >> reasons, they're a security issue. Oh and some objects are not >picklable >> (Jinja templates! to name a problematic one...). I've also seen >> secret-related classes that raise when pickled (thank god!). >> >> About callback and other things like that, it's quite a puzzle in >python. >> One solution would be to point to a python namespace >> callback="preset.airflow_utils.slack_callback" and assume the >function has >> to exist in the remote interpreter. Personally I like the DagFetcher >idea >> (it could be great to get a pointer to that mailing list thread >here), >> specifically the GitDagFetcher. I don't know how [un]reasonable it >is, but >> I hate pickles so much that shipping source code around seems much >more >> reasonable to me. I think out there there's a talk from Mike Star >about >> Dataswarm at FB and he may mention how their workers may git shallow >clone >> the pipeline repo. Or maybe they use that "beautifully ugly" hack to >use >> a gitfs fuse [file system in user space] on the worker [could get >deeper >> into that, not sure how reasonable that is either]. >> >> About fingerprints, a simple `start_date = datetime.now() - >timedelta(1)` >> may lead to a never-repeating fingerprint. From memory the spec >doesn't >> list out the properties considered to build the hash. It be helpful >to >> specify and review that list. >> >> Max >> >> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <kaxiln...@gmail.com> >wrote: >> >> > Thanks, both Max and Dan for your comments, please check my reply >below: >> > >> > >> > > Personally I vote for a DAG version to be pinned and consistent >for the >> > > duration of the DAG run. Some of the reasons why: >> > > - it's easier to reason about, and therefore visualize and >troubleshoot >> > > - it prevents some cases where dependencies are never met >> > > - it prevents the explosion of artifact/metadata (one >serialization per >> > > dagrun as opposed to one per scheduler cycle) in the case of a >dynamic >> > DAG >> > > whose fingerprint is never the same. >> > >> > >> > In this AIP, we were only looking to fix the current "Viewing >behaviour" >> > and >> > we were intentionally not changing the execution behaviour. >> > The change you are suggesting means we need to introduce DAG >Versioning for >> > the >> > workers too. This will need more work as can't use the Serialised >> > Representation >> > to run the task since users could use custom modules in a different >part of >> > code, >> > example the PythonOperator has python_callable that allows running >any >> > arbitrary code. >> > A similar case is with the *on_*_callbacks* defined on DAG. >> > >> > Based on the current scope of the AIP, we still plan to use the >actual DAG >> > files for the >> > execution and not use Serialized DAGs for the workers. >> > >> > To account for all the custom modules we will have to start looking >at >> > pickle (cloudpickle). >> > >> > I'm certain that there are lots of >> > > those DAGs out there, and that it will overwhelm the metadata >database, >> > and >> > > confuse the users. For an hourly DAG is would mean 24 artifact >per day >> > > instead of 1000+ >> > >> > >> > What kind of dynamic DAGs are we talking about here, I would think >the DAG >> > signature won't change >> > but I might be wrong, can you give an example, please. >> > >> > If backwards compatibility in behavior is a concern, I'd recommend >adding a >> > > flag to the DAG class and/or config and make sure we're doing the >right >> > > thing by default. People who want backward compatibility would >have to >> > > change that default. But again, that's a lot of extra and >confusing >> > > complexity that will likely be the source of bugs and user >confusion. >> > > Having a clear, easy to reason about execution model is super >important. >> > >> > Think about visualizing a DAG that shapeshifted 5 times during its >> > > execution, how does anyone make sense of that? >> > >> > >> > Wouldn't that be an edge case? How often would someone change the >DAG >> > structure in the middle of >> > a DAG execution. And since if they do change, the Graph View should >show >> > all the tasks that were >> > run, if it just shows based on the latest version, the behaviour >would be >> > the same as now. >> > >> > -------- >> > >> > Strongly agree with Max's points, also I feel the right way to go >about >> > > this is instead of Airflow schedulers/webservers/workers reading >DAG >> > Python >> > > files, they would instead read from serialized representations of >the >> > DAGs >> > > (e.g. json representation in the Airflow DB). Instead of DAG >owners >> > pushing >> > > their DAG files to the Airflow components via varying mechanisms >(e.g. >> > > git), they would instead call an Airflow CLI to push the >serialized DAG >> > > representations to the DB, and for things like dynamic DAGs you >could >> > > populate them from a DAG or another service. >> > >> > >> > Airflow Webserver and the Scheduler will definitely read from the >> > Serialized representation as >> > they don't need all the code from the DAG files. >> > >> > While the workers definitely need access to DAG files as the >> > tasks/operators would be using >> > code form custom modules and classes which are required to run the >tasks. >> > >> > If we do want to go down that route we will have to use something >like >> > cloudpickle that serializes >> > entire DAG file and their dependencies. And also ensure that >someone is not >> > able to change the pickled >> > source when sending from executor to the worker as that poses a big >> > security risk. >> > >> > - Kaxil >> > >> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward <jw...@brandwatch.com> >wrote: >> > >> > > I came here to say what Max has said, only less eloquently. >> > > >> > > I do have one concern with locking the version for a single run. >> > Currently >> > > it is possible for a user to create a dag which intentionally >changes as >> > a >> > > dag executes, i.e. dynamically creating a task for the dag during >a run >> > by >> > > modifying external data, but this change would prevent that. I'm >of the >> > > opinion that this situation is bad practice anyway so it doesn't >matter >> > if >> > > we make it impossible to do, but others may disagree. >> > > >> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov ><ddavy...@twitter.com.invalid> >> > > wrote: >> > > >> > > > Strongly agree with Max's points, also I feel the right way to >go about >> > > > this is instead of Airflow schedulers/webservers/workers >reading DAG >> > > Python >> > > > files, they would instead read from serialized representations >of the >> > > DAGs >> > > > (e.g. json representation in the Airflow DB). Instead of DAG >owners >> > > pushing >> > > > their DAG files to the Airflow components via varying >mechanisms (e.g. >> > > > git), they would instead call an Airflow CLI to push the >serialized DAG >> > > > representations to the DB, and for things like dynamic DAGs you >could >> > > > populate them from a DAG or another service. >> > > > >> > > > This would also enable other features like stronger >> > > security/multi-tenancy. >> > > > >> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin < >> > > > maximebeauche...@gmail.com> wrote: >> > > > >> > > > > > "mixed version" >> > > > > >> > > > > Personally I vote for a DAG version to be pinned and >consistent for >> > the >> > > > > duration of the DAG run. Some of the reasons why: >> > > > > - it's easier to reason about, and therefore visualize and >> > troubleshoot >> > > > > - it prevents some cases where dependencies are never met >> > > > > - it prevents the explosion of artifact/metadata (one >serialization >> > per >> > > > > dagrun as opposed to one per scheduler cycle) in the case of >a >> > dynamic >> > > > DAG >> > > > > whose fingerprint is never the same. I'm certain that there >are lots >> > of >> > > > > those DAGs out there, and that it will overwhelm the metadata >> > database, >> > > > and >> > > > > confuse the users. For an hourly DAG is would mean 24 >artifact per >> > day >> > > > > instead of 1000+ >> > > > > >> > > > > If backwards compatibility in behavior is a concern, I'd >recommend >> > > > adding a >> > > > > flag to the DAG class and/or config and make sure we're doing >the >> > right >> > > > > thing by default. People who want backward compatibility >would have >> > to >> > > > > change that default. But again, that's a lot of extra and >confusing >> > > > > complexity that will likely be the source of bugs and user >confusion. >> > > > > Having a clear, easy to reason about execution model is super >> > > important. >> > > > > >> > > > > Think about visualizing a DAG that shapeshifted 5 times >during its >> > > > > execution, how does anyone make sense of that? >> > > > > >> > > > > Max >> > > > > >> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik ><kaxiln...@gmail.com> >> > > wrote: >> > > > > >> > > > > > Thanks Max for your comments. >> > > > > > >> > > > > > >> > > > > > *DAG Fingerprinting: *this can be tricky, especially in >regards to >> > > > > dynamic >> > > > > > > DAGs, where in some cases each parsing of the DAG can >result in a >> > > > > > different >> > > > > > > fingerprint. I think DAG and tasks attributes are left >out from >> > the >> > > > > > > proposal that should be considered as part of the >fingerprint, >> > like >> > > > > > trigger >> > > > > > > rules or task start/end datetime. We should do a full >pass of all >> > > DAG >> > > > > > > arguments and make sure we're not forgetting anything >that can >> > > change >> > > > > > > scheduling logic. Also, let's be careful that something >as simple >> > > as >> > > > a >> > > > > > > dynamic start or end date on a task could lead to a >different >> > > version >> > > > > > each >> > > > > > > time you parse. >> > > > > > >> > > > > > >> > > > > > >> > > > > > The short version of Dag Fingerprinting would be >> > > > > > just a hash of the Serialized DAG. >> > > > > > >> > > > > > *Example DAG*: https://imgur.com/TVuoN3p >> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr >> > > > > > >> > > > > > It contains all the task & DAG parameters. When they >change, >> > > Scheduler >> > > > > > writes >> > > > > > a new version of Serialized DAGs to the DB. The Webserver >then >> > reads >> > > > the >> > > > > > DAGs from the DB. >> > > > > > >> > > > > > I'd recommend limiting serialization/storage of one version >> > > > > > > per DAG Run, as opposed to potentially everytime the DAG >is >> > parsed >> > > - >> > > > > once >> > > > > > > the version for a DAG run is pinned, fingerprinting is >not >> > > > re-evaluated >> > > > > > > until the next DAG run is ready to get created. >> > > > > > >> > > > > > >> > > > > > This is to handle Scenario 3 where a DAG structure is >changed >> > > mid-way. >> > > > > > Since we don't intend to >> > > > > > change the execution behaviour, if we limit Storage of 1 >version >> > per >> > > > DAG, >> > > > > > it won't actually show what >> > > > > > was run. >> > > > > > >> > > > > > Example Dag v1: Task A -> Task B -> Task C >> > > > > > The worker has completed the execution of Task B and is >just about >> > to >> > > > > > complete the execution of Task B. >> > > > > > >> > > > > > The 2nd version of DAG is deployed: Task A -> Task D >> > > > > > Now Scheduler queued Task D and it will run to completion. >(Task C >> > > > won't >> > > > > > run) >> > > > > > >> > > > > > In this case, "the actual representation of the DAG" that >run is >> > > > neither >> > > > > v1 >> > > > > > nor v2 but a "mixed version" >> > > > > > (Task A -> Task B -> Task D). The plan is that the >Scheduler will >> > > > create >> > > > > > this "mixed version" based on what ran >> > > > > > and the Graph View would show this "mixed version". >> > > > > > >> > > > > > There would also be a toggle button on the Graph View to >select v1 >> > or >> > > > v2 >> > > > > > where the tasks will be highlighted to show >> > > > > > that a particular task was in v1 or v2 as shown in >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2 >> > > > > > >> > > > > > >> > > > > > >> > > > > > *Visualizing change in the tree view:* I think this is very >complex >> > > and >> > > > > > > many things can make this view impossible to render (task >> > > dependency >> > > > > > > reversal, cycles across versions, ...). Maybe a better >visual >> > > > approach >> > > > > > > would be to render independent, individual tree views for >each >> > DAG >> > > > > > version >> > > > > > > (side by side), and doing best effort aligning the tasks >across >> > > > blocks >> > > > > > and >> > > > > > > "linking" tasks with lines across blocks when necessary. >> > > > > > >> > > > > > >> > > > > > Agreed, the plan is to do the best effort aligning. >> > > > > > At this point in time, task additions to the end of the DAG >are >> > > > expected >> > > > > to >> > > > > > be compatible, >> > > > > > but changes to task structure within the DAG may cause the >tree >> > view >> > > > not >> > > > > to >> > > > > > incorporate “old” and “new” in the same view, hence that >won't be >> > > > shown. >> > > > > > >> > > > > > Regards, >> > > > > > Kaxil >> > > > > > >> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin < >> > > > > > maximebeauche...@gmail.com> wrote: >> > > > > > >> > > > > > > Some notes and ideas: >> > > > > > > >> > > > > > > *DAG Fingerprinting: *this can be tricky, especially in >regards >> > to >> > > > > > dynamic >> > > > > > > DAGs, where in some cases each parsing of the DAG can >result in a >> > > > > > different >> > > > > > > fingerprint. I think DAG and tasks attributes are left >out from >> > the >> > > > > > > proposal that should be considered as part of the >fingerprint, >> > like >> > > > > > trigger >> > > > > > > rules or task start/end datetime. We should do a full >pass of all >> > > DAG >> > > > > > > arguments and make sure we're not forgetting anything >that can >> > > change >> > > > > > > scheduling logic. Also, let's be careful that something >as simple >> > > as >> > > > a >> > > > > > > dynamic start or end date on a task could lead to a >different >> > > version >> > > > > > each >> > > > > > > time you parse. I'd recommend limiting >serialization/storage of >> > one >> > > > > > version >> > > > > > > per DAG Run, as opposed to potentially everytime the DAG >is >> > parsed >> > > - >> > > > > once >> > > > > > > the version for a DAG run is pinned, fingerprinting is >not >> > > > re-evaluated >> > > > > > > until the next DAG run is ready to get created. >> > > > > > > >> > > > > > > *Visualizing change in the tree view:* I think this is >very >> > complex >> > > > and >> > > > > > > many things can make this view impossible to render (task >> > > dependency >> > > > > > > reversal, cycles across versions, ...). Maybe a better >visual >> > > > approach >> > > > > > > would be to render independent, individual tree views for >each >> > DAG >> > > > > > version >> > > > > > > (side by side), and doing best effort aligning the tasks >across >> > > > blocks >> > > > > > and >> > > > > > > "linking" tasks with lines across blocks when necessary. >> > > > > > > >> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka < >> > vik...@astronomer.io >> > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > Team, >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > We just created 'AIP-36 DAG Versioning' on Confluence >and would >> > > > very >> > > > > > much >> > > > > > > > appreciate feedback and suggestions from the community. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > The DAG Versioning concept has been discussed on >multiple >> > > occasions >> > > > > in >> > > > > > > the >> > > > > > > > past and has been a topic highlighted as part of >Airflow 2.0 as >> > > > well. >> > > > > > We >> > > > > > > at >> > > > > > > > Astronomer have heard data engineers at several >enterprises ask >> > > > about >> > > > > > > this >> > > > > > > > feature as well, for easier debugging when changes are >made to >> > > DAGs >> > > > > as >> > > > > > a >> > > > > > > > result of evolving business needs. >> > > > > > > > >> > > > > > > > >> > > > > > > > As described in the AIP, we have a proposal focused on >ensuring >> > > > that >> > > > > > the >> > > > > > > > visibility behaviour of Airflow is correct, without >changing >> > the >> > > > > > > execution >> > > > > > > > behaviour. We considered changing the execution >behaviour as >> > > well, >> > > > > but >> > > > > > > > decided that the risks in changing execution behavior >were too >> > > high >> > > > > as >> > > > > > > > compared to the benefits and therefore decided to limit >the >> > scope >> > > > to >> > > > > > only >> > > > > > > > making sure that the visibility was correct. >> > > > > > > > >> > > > > > > > >> > > > > > > > We would like to attempt this based on our experience >running >> > > > Airflow >> > > > > > as >> > > > > > > a >> > > > > > > > service. We believe that this benefits Airflow as a >project and >> > > the >> > > > > > > > development experience of data engineers using Airflow >across >> > the >> > > > > > world. >> > > > > > > > >> > > > > > > > >> > > > > > > > Any feedback, suggestions, and comments would be >greatly >> > > > > appreciated. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > Best Regards, >> > > > > > > > >> > > > > > > > >> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and >Vikram Koka >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > -- >> > > >> > > Jacob Ward | Graduate Data Infrastructure Engineer >> > > >> > > jw...@brandwatch.com >> > > >> > > >> > > NEW YORK | BOSTON | BRIGHTON | LONDON | BERLIN | >STUTTGART | >> > > PARIS | SINGAPORE | SYDNEY >> > > >> > > > > >-- > >Jarek Potiuk >Polidea | Principal Software Engineer > >M: +48 660 796 129