We didn't reach any conclusion on this yet but I agree, and this is the big task that we at Astronomer are going to work on next for Airflow.
I've started chatting to a few of the other committers about this to get a an idea of people's priorities, and have had a chat with Alex at Uber about their experiences of making their internal fork of Airflow - Piper https://eng.uber.com/managing-data-workflows-at-scale/ I'll create something in the wiki (probably not an AIP to start with) to collect the possible approaches and downsides/limitations. Watch this space. -ash > On 18 Jul 2019, at 07:05, Tao Feng <fengta...@gmail.com> wrote: > > Do we reach any consensus on this topic /AIP? I think persisting DAG is > pretty important actually. > > -Tao > > On Tue, Mar 12, 2019 at 3:01 AM Kevin Yang <yrql...@gmail.com> wrote: > >> Hi Fokko, >> >> As a large cluster maintainer, I’m not a big fan of large DAG files >> neither. But I’m not sure if I’ll consider this bad practice. We have some >> large frameworks, e.g. experimentation and machine learning, that are >> complex by nature and generate large number of DAGs from their customer >> configs to get better flexibility. I consider them as advance use cases of >> Airflow and open up a lot potentials for Airflow, unless we’ve previously >> set some boundaries around how complex DAG codes can be that I’m not aware >> of. About resulting in an unworkable situation, yes we are experiencing >> pain from having such large DAG files, mainly on the webserver side, but >> the system overall are running stable. We are actually hoping to improve >> the situation by applying solutions like making webserver stateless. It is >> ok that if the owners of large DAG files need to pay but we should try >> minimize the price—longer refresh interval, extra task running time, but >> nothing too crazy. >> >> >> I think we’re aligned on storing info in DB as long as we can meet the >> requirements Dan mentioned earlier—we just need that balance decided, so >> I’m gonna skip this part( out of all the requirements, No.1 seems to be >> least clear, maybe we can expand on that). One thing about the proposed >> idea is that we implicitly couple DagRun with DAG version, which at the >> first glance make sense but imo not very ideal. I feel full versioning >> should track all changes instead of tracking changes only when we create >> DagRun. E.g. my task failed and I merged new code to fix my task and I want >> to rerun it with the current code, with serialize DAG during DagRun >> creation time we won’t have the up to date snapshot—sure we can work around >> it by like always keep a current snapshot of DAG but this is kinda messy >> and confusing. This is what popped up on the top of my head and w/o full >> versioning we might have some other tricky cases, e.g. ur backfill case. >> But I just gave a few thoughts into this and you might already have a >> complete story that will void my concerns. >> >> >> Cheers, >> Kevin Y >> >> On Sun, Mar 10, 2019 at 11:29 AM Driesprong, Fokko <fo...@driesprong.frl> >> wrote: >> >>> Thanks Kevin for opening the discussion. I think it is important to have >> a >>> clear overview on how to approach the AIP. >>> >>> First of all, how many DAGs do we have that take 30s to parse? I consider >>> this bad practice, and this would also result in an unworkable situation >>> with the current setup of Airflow since it will take a lot of resources >> on >>> the webserver/scheduler, and the whole system will become unresponsive. I >>> will be hard to cope with such DAGs in general. >>> >>> The idea from the AIP is to have the versioned version of the dag in the >>> DB, so in the end, you won't need to parse the whole thing every time. >> Only >>> when you trigger a DAG, or when you want to see the current status of the >>> dag. >>> >>> Like stated earlier, I strongly feel we shouldn't serialize the DAGs as >>> JSON(5) or pickles in general. For me, this is deferring the pain of >>> setting up a structure of the DAG object itself. >>> Having the DAG denormalized in the database will give us cleaner storage >> of >>> our DAG. We can, for example, enforce fields by making them not null, so >> we >>> know that is something is off at write time, instead of read. >> Furthermore, >>> we're missing logical types such as dates, which we efficiently can query >>> using the indices of the database. >>> Also, with all serialization formats, evolution isn't trivial. Consider >> the >>> situations when: >>> - We're introducing a new field, and it might be null, therefore we need >> to >>> bake in all kinds of logic into the Airflow code, which you don't want. >>> With proper migration scripts, you could prefill these fields, and make >>> them not null. >>> - Changing the models, for example, you still can't change a string-type >>> into a integer with adding custom logic. In this case, the reviewer needs >>> to be extra careful that there are no breaking changes introduced. Right >>> now we're doing minimal forward- and backward compatibilitytesting. >>> >>> In the case we get too many migrations, we could also squash (some of >> them) >>> when preparing the release. >>> >>> Personally, I don't think the serialization is the issue here. As Max >>> already mentioned, it is the optimal balance of (de)normalization. From >> the >>> user perspective, the serialization won't change much of the behaviour of >>> Airflow. >>> >>> For me, instead of having `DAG.serialize()` and `DAG.deser(version)` is >> not >>> the ideal approach. But it might be that we're on the same page :-) I >>> believe it should be something like `DagRun.find('fokkos_dag', >>> datetime(2018, 03, 01))` and construct the correct version of the dag. >>> Since there is an uniqueness constrain on dag_id, datetime, this will >>> always return the same dag. You will get the versioned DagRun as it ran >>> that time. Serializing the fields adn storing them in the database should >>> happen transparently when you update the DAG object. When you run a dag, >>> you'll parse the dag, and then run it. `Dag().create_dagrun(...)`, this >>> will create a DagRun as the name suggests, if the version of the dag >> still >>> exists in the database, it will reuse that one, otherwise it will create >> a >>> new version of the DAG (with all the operators etc). In this sense the >>> version of the DAGs should be done within the Dag(Run). >>> >>> The versioning will change the behavour from a user perspective. Right >> now >>> we store only a single version. For example, the poor mans backfilling >>> won't work anymore. This is clearing the state from past&future, up- and >>> downstream, and let it catch up again. >>> In this case, the old version of the DAG won't exists anymore, and >>> potentially there are tasks that aren't in the code anymore. In this case >>> we need to clear the version of the dag, and rerun it with the latest >>> version `DagRun.find('fokkos_dag', datetime(2018, 03, 01)).clear()`. How >> we >>> are going to do clear's downstram in the middle of the dag, that is >>> something I still have to figure out. Because potentially there are tasks >>> that can't be rerun because the underlying Python code has changed, both >> on >>> user level as on Airflow level. It will be impossible to get these >> features >>> pure in that sense. >>> I would not suggest adding a new status in here, indicating that the task >>> can't be rerun since it isn't part of the DAG anymore. We have to find >> the >>> balance here in adding complexity (also to the scheduler) and features >> that >>> we need to introduce to help the user. >>> >>> Cheers, Fokko >>> >>> Ps. Jarek, interesting idea. It shouldn't be too hard to make Airflow >> more >>> k8s native. You could package your dags within your container, and do a >>> rolling update. Add the DAGs as the last layer, and then point the DAGs >>> folder to the same location. The hard part here is that you need to >>> gracefuly restart the workers. Currently AFAIK the signals given to the >> pod >>> aren't respected. So when the scheduler/webserver/worker receives a >>> SIGTERM, it should stop the jobs nicely and then exit the container, >> before >>> k8s kills the container using a SIGKILL. This will be challenging with >> the >>> workers, which they are potentially long-running. Maybe stop kicking off >>> new jobs, and let the old ones finish, will be good enough, but then we >>> need to increase the standard kill timeout substantially. Having this >> would >>> also enable the autoscaling of the workers. >>> >>> >>> >>> Op za 9 mrt. 2019 om 19:07 schreef Maxime Beauchemin < >>> maximebeauche...@gmail.com>: >>> >>>> I want to raise the question of the amount of normalization we want to >>> use >>>> here as it seems the to be an area that needs more attention. >>>> >>>> The SIP suggest having DAG blobs, task blobs and edges (call it the >>>> fairly-normalized). I also like the idea of all-encompassing (call it >>>> very-denormalized) DAG blobs as it seems easier to manage in terms of >>>> versioning. The question here is whether we go with one of these method >>>> exclusively, something in-between or even a hybrid approach (redundant >>>> blobs that use different level of normalization). >>>> >>>> It's nice and simple to just push or pull DAG atomic objects with a >>> version >>>> stamp on it. It's clearly simpler than dealing with 3 versioned tables >>>> (dag, tasks, edges). There are a lot of pros/cons, and they become more >>>> apparent with the perspective of very large DAGs. If the web server is >>>> building a "task details page", using the "fairly-normalized" model, it >>> can >>>> just pull what it needs instead of pulling the large DAG blob. >> Similarly, >>>> if building a sub-tree view (a subset of the DAG), perhaps it can only >>>> retrieve what it needs. But if you need the whole DAG (say for the >>>> scheduler use case) then you're dealing with more complex SQL/ORM >>>> operations (joins hopefully, or multiple db round trips) >>>> >>>> Now maybe the right approach is more something like 2 tables: DAG and >>>> task_details, where edges keys are denormalized into DAG (arguably >>> that's a >>>> few KBs at most, even for large DAGs), and maybe the DAG object has >> most >>> of >>>> the high level task metadata information (operator, name, baseoperator >>> key >>>> attrs), and task_details has the big blobs (SQL code). This is >> probably a >>>> nice compromise, the question becomes "how much task-level detail do we >>>> store in the DAG-centric blog?", probably not much to keep the DAG >>> objects >>>> as small as possible. The main downside here is that you cannot have >> the >>>> database join and have to do 2 round trips to reconstruct a DAG object >>>> (fetch the DAG, parse the object to get the list of tasks, and then run >>>> another db query to get those task details). >>>> >>>> To resume, I'd qualify the more normalized approach as the most proper, >>> but >>>> also the more complex. It'll shine in specific cases around large DAGs. >>> If >>>> we have the proper abstractions (methods like DAG.serialize(), >>>> DAG.deser(version)) then I guess that's not an issue. >>>> >>>> Max >>>> >>>> On Fri, Mar 8, 2019 at 5:21 PM Kevin Yang <yrql...@gmail.com> wrote: >>>> >>>>> Hi Julian, I'm definitely aligned with you guys on making the >> webserver >>>>> independent of DAG parsing, just the end goal to me would be to >> build a >>>>> complete story around serializing DAG--and move with the story in >>> mind. I >>>>> feel like you guys may already have a list of dynamic features we >> need >>> to >>>>> deprecate/change, if that is the case feel free to open the >> discussion >>> on >>>>> what we do to them with DAG serialization. >>>>> >>>>> Julian, Ash, Dan, on 2nd thought I do agree that if we can meet the >>>>> requirements Dan mentioned, it would be nice to have them stored in >> the >>>> DB. >>>>> Some combined solutions like having a column of serialized graph in >> the >>>>> serialized dag table can potentially meet all requirements. What >> format >>>> we >>>>> end up using to represent DAG between components is now less >> important >>>>> IMO--fine to refactor those endpoints only need DagModel to use only >>>>> DagModel, easy to do a batch replacement if we decide otherwise >> later. >>>> More >>>>> important is to define this source of truth for serialized DAG. >>>>> >>>>> Ash, ty for the email list, I'll tune my filters accordingly :D I'm >>>> leaning >>>>> towards having a separate process for the parser so we got no >> scheduler >>>>> dependency etc for this parser but we can discuss this in another >>> thread. >>>>> >>>>> On Fri, Mar 8, 2019 at 8:57 AM Dan Davydov >>> <ddavy...@twitter.com.invalid >>>>> >>>>> wrote: >>>>> >>>>>>> >>>>>>> Personally I don’t understand why people are pushing for a >>> JSON-based >>>>> DAG >>>>>>> representation >>>>>> >>>>>> It sounds like you agree that DAGs should be serialized (just in >> the >>> DB >>>>>> instead of JSON), so will only address why JSON is better than >> MySQL >>>> (AKA >>>>>> serializing at the DAG level vs the task level) as far as I can >> see, >>>> and >>>>>> not why we need serialization. If you zoom out and look at all the >>> use >>>>>> cases of serialized DAGs, e.g. having the scheduler use them >> instead >>> of >>>>>> parsing DAGs directly, then it becomes clear that we need all >>>> appropriate >>>>>> metadata in these DAGs, (operator params, DAG properties, etc), in >>>> which >>>>>> case it's not clear how it will fit nicely into a DB table (unless >>> you >>>>>> wanted to do something like (parent_task_id, task_id, task_params), >>>> also >>>>>> keep in mind that we will need to store different versions of each >>> DAG >>>> in >>>>>> the future so that we can ensure consistency in a dagrun, i.e. each >>>> task >>>>> in >>>>>> a dagrun uses the same version of a DAG. >>>>>> >>>>>> I think some of our requirements should be: >>>>>> 1. The data model will lead to acceptable performance in all of its >>>>>> consumers (scheduler, webserver, workers), i.e. no n+1 access >>> patterns >>>>> (my >>>>>> biggest concern about serializing at task level as you propose vs >> at >>>> DAG >>>>>> level) >>>>>> 2. We can have versioning of serialized DAGs >>>>>> 3. The ability to separate DAGs into their own data store (e.g. no >>>>> reliance >>>>>> on joins between the new table and the old one) >>>>>> 4. One source of truth/serialized representation for DAGs >> (currently >>> we >>>>>> have SimpleDAG) >>>>>> >>>>>> If we can full-fill all of these requirements and serialize at the >>> task >>>>>> level rather than the DAG level in the DB, then I agree that >> probably >>>>> makes >>>>>> more sense. >>>>>> >>>>>> >>>>>>> In the proposed PR’s we (Peter, Bas and me) aim to avoid >> re-parsing >>>> DAG >>>>>>> files by querying all the required information from the database. >>> In >>>>> one >>>>>> or >>>>>>> two cases this may however not be possible, in which case we >> might >>>>> either >>>>>>> have to fall back on the DAG file or add the missing information >>> into >>>>> the >>>>>>> database. We can tackle these problems as we encounter them. >>>>>> >>>>>> I think you would have the support of many of committers in >> removing >>>> any >>>>>> use-cases that stand in the way of full serialization, that being >>> said >>>> if >>>>>> we need to remove features we need to do this carefully and >>>> thoughtfully, >>>>>> and ideally with proposed alternatives/work-arounds to cover the >>>>> removals. >>>>>> >>>>>> The counter argument: this PR removes the need for the confusing >>>>> "Refresh" >>>>>>> button from the UI, and in general you only pay the cost for the >>>>>> expensive >>>>>>> DAGs when you ask about them. (I don't know what/when we call the >>>>>>> /pickle_info endpoint of the top of my head) >>>>>> >>>>>> Probably worth splitting out into a separate thread, but I'm >> actually >>>> not >>>>>> sure the refresh button does anything, I think we should double >>>> check... >>>>> I >>>>>> think about 2 years ago there was a commit made that made gunicorn >>>>>> webservers automatically rotate underneath flask (each one would >>>> reparse >>>>>> the DAGbag). Even if it works we should probably remove it since >> the >>>>>> webserver refresh interval is pretty fast, and it just causes >>> confusion >>>>> to >>>>>> users and implies that the DAGs are not refreshed automatically. >>>>>> >>>>>> Do you mean https://json5.org/ or is this a typo? That might be >> okay >>>>> for a >>>>>>> nicer user front end, but the "canonical" version stored in the >> DB >>>>> should >>>>>>> be something "plainer" like just JSON. >>>>>> >>>>>> I think he got this from my reply, and it was just an example, but >>> you >>>>> are >>>>>> right, I agree JSON would be better than JSON5. >>>>>> >>>>>> On Fri, Mar 8, 2019 at 8:53 AM Ash Berlin-Taylor <a...@apache.org> >>>> wrote: >>>>>> >>>>>>> Comments inline. >>>>>>> >>>>>>>> On 8 Mar 2019, at 11:28, Kevin Yang <yrql...@gmail.com> wrote: >>>>>>>> >>>>>>>> Hi all, >>>>>>>> When I was preparing some work related to this AIP I found >>>> something >>>>>>> very concerning. I noticed this JIRA ticket < >>>>>>> https://issues.apache.org/jira/browse/AIRFLOW-3562> is trying to >>>>> remove >>>>>>> the dependency of dagbag from webserver, which is awesome--we >>> wanted >>>>>> badly >>>>>>> but never got to start work on. However when I looked at some >>>> subtasks >>>>> of >>>>>>> it, which try to remove dagbag dependency from each endpoint, I >>> found >>>>> the >>>>>>> way we remove the dependency of dagbag is not very ideal. For >>> example >>>>>> this >>>>>>> PR <https://github.com/apache/airflow/pull/4867/files> will >>> require >>>> us >>>>>> to >>>>>>> parse the dag file each time we hit the endpoint. >>>>>>> >>>>>>> The counter argument: this PR removes the need for the confusing >>>>>> "Refresh" >>>>>>> button from the UI, and in general you only pay the cost for the >>>>>> expensive >>>>>>> DAGs when you ask about them. (I don't know what/when we call the >>>>>>> /pickle_info endpoint of the top of my head) >>>>>>> >>>>>>> This end point may be one to hold off on (as it can ask for >>> multiple >>>>>> dags) >>>>>>> but there are some that def don't need a full dag bag or to even >>>> parse >>>>>> the >>>>>>> dag file, the current DAG model has enough info. >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> If we go down this path, we indeed can get rid of the dagbag >>>>> dependency >>>>>>> easily, but we will have to 1. increase the DB load( not too >>>> concerning >>>>>> at >>>>>>> the moment ), 2. wait the DAG file to be parsed before getting >> the >>>> page >>>>>>> back, potentially multiple times. DAG file can sometimes take >>> quite a >>>>>> while >>>>>>> to parse, e.g. we have some framework DAG files generating large >>>> number >>>>>> of >>>>>>> DAGs from some static config files or even jupyter notebooks and >>> they >>>>> can >>>>>>> take 30+ seconds to parse. Yes we don't like large DAG files but >>>> people >>>>>> do >>>>>>> see the beauty of code as config and sometimes heavily >>> abuseleverage >>>>> it. >>>>>>> Assuming all users have the same nice small python file that can >> be >>>>>> parsed >>>>>>> fast, I'm still a bit worried about this approach. Continuing on >>> this >>>>>> path >>>>>>> means we've chosen DagModel to be the serialized representation >> of >>>> DAG >>>>>> and >>>>>>> DB columns to hold different properties--it can be one candidate >>> but >>>> I >>>>>>> don't know if we should settle on that now. I would personally >>>> prefer a >>>>>>> more compact, e.g. JSON5, and easy to scale representation( such >>> that >>>>>>> serializing new fields != DB upgrade). >>>>>>> >>>>>>> Do you mean https://json5.org/ or is this a typo? That might be >>> okay >>>>> for >>>>>>> a nicer user front end, but the "canonical" version stored in the >>> DB >>>>>> should >>>>>>> be something "plainer" like just JSON. >>>>>>> >>>>>>> I'm not sure that "serializing new fields != DB upgrade" is that >>> big >>>>> of a >>>>>>> concern, as we don't add fields that often. One possible way of >>>> dealing >>>>>>> with it if we do is to have a hybrid approach - a few distinct >>>> columns, >>>>>> but >>>>>>> then a JSON blob. (and if we were only to support postgres we >> could >>>>> just >>>>>>> use JSONb. But I think our friends at Google may object ;) ) >>>>>>> >>>>>>> Adding a new column in a DB migration with a default NULL >> shouldn't >>>> be >>>>> an >>>>>>> expensive operation, or difficult to achieve. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> In my imagination we would have to collect the list of dynamic >>>>> features >>>>>>> depending on unserializable fields of a DAG and start a >>>> discussion/vote >>>>>> on >>>>>>> dropping support of them( I'm working on this but if anyone has >>>> already >>>>>>> done so please take over), decide on the serialized >> representation >>>> of a >>>>>> DAG >>>>>>> and then replace dagbag with it in webserver. Per previous >>> discussion >>>>> and >>>>>>> some offline discussions with Dan, one future of DAG >> serialization >>>>> that I >>>>>>> like would look similar to this: >>>>>>>> >>>>>>> >>>>>>>> https://imgur.com/ncqqQgc >>>>>>> >>>>>>> Something I've thought about before for other things was to embed >>> an >>>>> API >>>>>>> server _into_ the scheduler - this would be useful for k8s >>>>> healthchecks, >>>>>>> native Prometheus metrics without needed statsd bridge, and could >>>> have >>>>>>> endpoints to get information such as this directly. >>>>>>> >>>>>>> I was thinking it would be _in_ the scheduler process using >> either >>>>>> threads >>>>>>> (ick. Python's still got a GIL doesn't it?) or using >> async/twisted >>>> etc. >>>>>>> (not a side-car process like we have with the logs webserver for >>>>> `airflow >>>>>>> worker`). >>>>>>> >>>>>>> (This is possibly an unrelated discussion, but might be worth >>> talking >>>>>>> about?) >>>>>>> >>>>>>>> We can still discuss/vote which approach we want to take but I >>>> don't >>>>>>> want the door to above design to be shut right now or we have to >>>> spend >>>>> a >>>>>>> lot effort switch path later. >>>>>>>> >>>>>>>> Bas and Peter, I'm very sorry to extend the discussion but I do >>>> think >>>>>>> this is tightly related to the AIP and PRs behind it. And my >>> sincere >>>>>>> apology for bringing this up so late( I only pull the open PR >> list >>>>>>> occasionally, if there's a way to subscribe to new PR event I'd >>> love >>>> to >>>>>>> know how). >>>>>>> >>>>>>> It's noisy, but you can subscribe to comm...@airflow.apache.org >>> (but >>>>> be >>>>>>> warned, this also includes all Jira tickets, edits of every >> comment >>>> on >>>>>>> github etc.). >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Kevin Y >>>>>>>> >>>>>>>> On Thu, Feb 28, 2019 at 1:36 PM Peter van t Hof < >>>>> pjrvant...@gmail.com >>>>>>> <mailto:pjrvant...@gmail.com>> wrote: >>>>>>>> Hi all, >>>>>>>> >>>>>>>> Just some comments one the point Bolke dit give in relation of >> my >>>> PR. >>>>>>>> >>>>>>>> At first, the main focus is: making the webserver stateless. >>>>>>>> >>>>>>>>> 1) Make the webserver stateless: needs the graph of the >>> *current* >>>>> dag >>>>>>>> >>>>>>>> This is the main goal but for this a lot more PR’s will be >> coming >>>>> once >>>>>>> my current is merged. For edges and graph view this is covered in >>> my >>>> PR >>>>>>> already. >>>>>>>> >>>>>>>>> 2) Version dags: for consistency mainly and not requiring >>> parsing >>>>> of >>>>>>> the >>>>>>>>> dag on every loop >>>>>>>> >>>>>>>> In my PR the historical graphs will be stored for each DagRun. >>> This >>>>>>> means that you can see if an older DagRun was the same graph >>>> structure, >>>>>>> even if some tasks does not exists anymore in the current graph. >>>>>> Especially >>>>>>> for dynamic DAG’s this is very useful. >>>>>>>> >>>>>>>>> 3) Make the scheduler not require DAG files. This could be >> done >>>> if >>>>>> the >>>>>>>>> edges contain all information when to trigger the next task. >> We >>>> can >>>>>>> then >>>>>>>>> have event driven dag parsing outside of the scheduler loop, >>> ie. >>>> by >>>>>> the >>>>>>>>> cli. Storage can also be somewhere else (git, artifactory, >>>>>> filesystem, >>>>>>>>> whatever). >>>>>>>> >>>>>>>> The scheduler is almost untouched in this PR. The only thing >> that >>>> is >>>>>>> added is that this edges are saved to the database but the >>> scheduling >>>>>>> itself din’t change. The scheduler depends now still on the DAG >>>> object. >>>>>>>> >>>>>>>>> 4) Fully serialise the dag so it becomes transferable to >>> workers >>>>>>>> >>>>>>>> It nice to see that people has a lot of idea’s about this. But >> as >>>>> Fokko >>>>>>> already mentioned this is out of scope for the issue what we are >>>> trying >>>>>> to >>>>>>> solve. I also have some idea’s about this but I like to limit >> this >>>>> PR/AIP >>>>>>> to the webserver. >>>>>>>> >>>>>>>> For now my PR does solve 1 and 2 and the rest of the behaviour >>>> (like >>>>>>> scheduling) is untouched. >>>>>>>> >>>>>>>> Gr, >>>>>>>> Peter >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>