Hello, I have updated AIP-44 with the findings from the POC. If there are any more questions/comments, please let me know, otherwise I plan to start voting tomorrow.
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API On Mon, Jul 25, 2022 at 10:18 PM Jarek Potiuk <ja...@potiuk.com> wrote: > I think the drop will be less than those numbers - especially the high-end > ones. The DB operations we use are quite "coarse" grained (they span a > single transaction, not a function call). Also I deliberately chose > "example dags" as the benchmark playground - the example DAGs we have are > small. And in this case it means that the "actual" work done is rather > small. If you start parsing bigger DAGs, the overall number of calls (thus > RPC overhead) will not grow, but the time needed to parse the DAG will grow > a lot. This means that the RPC overhead will get way smaller > (percentage-wise). > > Also this is not an "overall" performance drop - this is really a > "mini-benchmark" (as opposed to "system benchmark" and "micro-benchmark"). > The benchmark only concerns running particular "transactions" - but all > the other parts remain unaffected - so the numbers I saw are the absolute, > maximum limits of the performance drop and they will be much smaller in > reality. > > J. > > > On Mon, Jul 25, 2022 at 9:01 PM Mateusz Henc <mh...@google.com.invalid> > wrote: > >> Hi, >> Sorry, but I was OOO and offline last week and I was not able to even >> rely. >> >> Thank you Jarek for a very detailed analysis. I think we all expected >> some performance drop for better security. >> Do you think the performance drop that you measured is what we should >> expect in the "final" version? I don't know Airflow that much to know which >> DB- access is used most (and what object types are required there - >> small/big or maybe just primitives?). >> >> Nonetheless, I am looking forward to starting the voting process! >> >> Best regards, >> Mateusz Henc >> >> >> On Fri, Jul 15, 2022 at 4:48 PM Jarek Potiuk <ja...@potiuk.com> wrote: >> >>> Two more things: >>> 1) there is one "caveat" I had to handle - timeout handling in >>> DagFileProcessor (signals do not play well with threads of GRPC - but that >>> should be easy to fix) >>> 2) The way I proposed it (with very localized changes needed) will be >>> very easy to split the job among multiple people and make it a true >>> community effort to complete - no need for major refactorings or changes >>> across the whole airflow code. >>> >>> J. >>> >>> On Fri, Jul 15, 2022 at 4:32 PM Jarek Potiuk <ja...@potiuk.com> wrote: >>> >>>> Hello Everyone, >>>> >>>> First of all - apologies for those who waited for it, I've been dragged >>>> in multiple directions, but finally I got some quality time to take a look >>>> at open questions and implement POC for AIP-44 as promised before. >>>> >>>> TL;DR; I finally came back to the AIP-44 and multi-tenancy and have >>>> made good progress that hopefully will lead to voting next week. I think I >>>> have enough of the missing evidence of the impact of the internal API on >>>> performance, also I have implemented working POC with one of the Internal >>>> API calls (processFile) that we will have to implement and run a series of >>>> performance tests with it. >>>> >>>> # Current state >>>> -------------------- >>>> >>>> The state we left it few months ago was ( >>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>> ): >>>> * I've prepared inventory of methods and general approach we are going >>>> to take (and we got consensus there) >>>> * I've left decision on the final choice (Thrift vs. GRPC) to later POC >>>> (I got some feedback from Beam about gRPC vs. Thrift, based on that I >>>> started with GRPC and I am actually very happy with it, so i think we can >>>> leave Thrift out of the picture). >>>> * We've struggled a bit with the decision - should we leave two paths >>>> (with GRPC/Direct DB) or one (local GRPC vs. remote GRPC) >>>> >>>> The POC is implemented here: >>>> https://github.com/apache/airflow/pull/25094 and I have the following >>>> findings: >>>> >>>> # Performance impact >>>> ------------------------------ >>>> >>>> The performance impact is visible (as expected). It's quite acceptable >>>> for a distributed environment (in exchange for security), but it's likely >>>> significant enough to stay with the original idea of having Airflow work in >>>> two modes: a) secure - with RPC apis, b) standard - with direct DB calls. >>>> >>>> On "localhost" with a local DB the performance overhead for serializing >>>> and transporting the messages between two different processes introduced up >>>> to 10% overhead. I saw an increase of time to run 500 scans of all our >>>> example folders dags going from ~290 to ~320s pretty consistently. Enough >>>> of a difference to exclude volatility. I tested it in Docker on both ARM >>>> and AMD. It seems that in some cases that can be partially offset by >>>> slightly increased parallelism on multi-processing machines run on "bare" >>>> metal. I got consistently just 2-3% increase on my linux without docker >>>> with the same test harness, but we cannot rely on such configurations, I >>>> think focusing on Docker-based installation without any special assumptions >>>> on how your networking/localhost is implemented is something we should >>>> treat as a baseline. >>>> >>>> The same experiment repeated with 50 messages but of much bigger size >>>> (300 Callbacks passed per message) have shown 30% performance drop. This is >>>> significant, but I believe most of our messages will be much smaller. Also >>>> the messages I chose were very special - because in case of Callback we are >>>> doing rather sophisticated serialization, because we have to account for >>>> Kubernetes objects that are potentially not serializable, so the test >>>> involved 300 messages that had to be not only GRPC serialized but also some >>>> parts of them had to be Airflow-JSON serialized and the whole "executor >>>> config" had to be picked/unpickled (for 100 such messages out of 300). Also >>>> we have some conditional processing there (there is a union of three types >>>> of callbacks that has to be properly deserialized). And those messages >>>> could be repeated. This is not happening for most other cases. And we can >>>> likely optimize it away in the future. but I wanted to see the "worst" >>>> scenario. >>>> >>>> For the remote client (still local DB for the internal API server) I >>>> got between 30% and 80% slow-down, but this was on my WiFi network. I am >>>> sure with a "wired" network, it will be much better, also when a remote DB >>>> gets into picture the overall percentage overhead will be much smaller. We >>>> knew this would be a "slower" solution but this is the price for someone >>>> who wants to have security and isolation. >>>> >>>> I have not yet performed the tests with SSL, but I think this will be a >>>> modest increase. And anyhow the "impact" of 10% is IMHO enough to make a >>>> decision that we cannot get "GRPC-only" - the path where we will continue >>>> using Direct DB access should stay (and I know already how to do it >>>> relatively easily). >>>> >>>> # Implementation and maintainability >>>> ------------------------------------------------- >>>> >>>> In the PR you will see the way I see implementation details and how it >>>> will impact our code in general. I think what I proposed is actually >>>> rather elegant and easy to maintain, and likely we can improve it somehow >>>> (happy to brainstorm on some creative ways we could use - for example - >>>> decorators) to make it "friendler" but I focused more on explicitness and >>>> showing the mechanisms involved rather than "fanciness". I found it rather >>>> easy to implement and it does seem to have some good "easy maintainability" >>>> properties. The way I propose it boils down to few "rules": >>>> >>>> * For all the data and "Messages" we sent, we have a nice "Internal >>>> API" defined in .proto and protobuf objects generated out of that. The GRPC >>>> proto nicely defines the structures we are going to send over the network. >>>> It's very standard, it has a really good modern support. For one, we >>>> automatically - I added pre-commit - generate MyPY type stubs for the >>>> generated classes and it makes it super easy to both - implement the >>>> mapping and automatically verify its correctness. Mypy nicely catches all >>>> kinds of mistakes you can make)! Autocomplete for the PROTO-generated >>>> classes works like a charm. I struggled initially without typing, but once >>>> I configured mypy stub generation, I got really nice detection of mistakes >>>> I've made and I was able to progress with the implementation way faster >>>> than without it. Usually everything magically worked as soon as I fixed all >>>> MyPy errors. >>>> >>>> * All Airflow objects that we get to send over GRPC should get >>>> from_protobuf/to_protobuf methods. They are usually very simple (just >>>> passing strings/ints/boolean fields to the constructor), and the structures >>>> we pass are rather small (see the PR). Also (as mentioned above) I >>>> implemented a few more complex cases (with more complex serialization) and >>>> it is easy, readable and pretty well integrated into our code IMHO. This >>>> introduces a little duplication here and there, but those objects change >>>> rarely (only when we implement big features like Dynamic Task mapping) and >>>> MyPy guards us against any mishaps there (now that our code is mostly >>>> typed). >>>> >>>> * We make all "DB Aware" objects also "GRPC aware". The number of >>>> changes in the actual code/logic is rather small and makes it super easy to >>>> maintain IMHO. There are two changes: >>>> >>>> 1) for any of the objects that are used for database operations (in >>>> my PR this is DagFileProcessor) we need to initialize it with the >>>> "use_grpc" flag and pass it a channel that will be used for communication. >>>> 2) the DB methods we have (inventory in the AIP) will have to be >>>> refactored slightly. This is what really was added, the original >>>> "process_file" method was renamed to "process_file_db" and the "callers" of >>>> the method are completely intact. >>>> >>>> def process_file_grpc( >>>> self, >>>> file_path: str, >>>> callback_requests: List[CallbackRequest], >>>> pickle_dags: bool = False, >>>> ) -> Tuple[int, int]: >>>> request = internal_api_pb2.FileProcessorRequest(path=file_path, >>>> pickle_dags=pickle_dags) >>>> for callback_request in callback_requests: >>>> request.callbacks.append(callback_request.to_protobuf()) >>>> res = self.stub.processFile(request) >>>> return res.dagsFound, res.errorsFound >>>> >>>> def process_file( >>>> self, >>>> file_path: str, >>>> callback_requests: List[CallbackRequest], >>>> pickle_dags: bool = False, >>>> ) -> Tuple[int, int]: >>>> if self.use_grpc: >>>> return self.process_file_grpc( >>>> file_path=file_path, >>>> callback_requests=callback_requests, pickle_dags=pickle_dags >>>> ) >>>> return self.process_file_db( >>>> file_path=file_path, callback_requests=callback_requests, >>>> pickle_dags=pickle_dags >>>> ) >>>> >>>> You can see all the details in the PR >>>> https://github.com/apache/airflow/pull/25094. >>>> >>>> # POC testing harness (self-service :) ) >>>> ---------------------------------------------------- >>>> >>>> You can also very easily test it yourself: >>>> >>>> * airflow internal-api server -> runs internal API server. >>>> * airflow internal-api test-client --num-repeats 10 --num-callbacks 10 >>>> --use-grpc -> runs file-processing of all example dags 10 times with 10x3 >>>> callbacks sent. Same command without --use-grpc will run using direct DB >>>> access. Server listens on "50051" port, client connects to >>>> "localhost:50051" - you can modify the code to use remote IP/different >>>> ports (easy to find by localhost:50051). >>>> >>>> # Discussion and voting >>>> -------------------------------- >>>> >>>> Of course some decisions in the PR can be improved (I focused on >>>> explicitness of the POC more than anything else). We can discuss some >>>> changes and improvements to some decisions I made when the PR will be in >>>> "reviewable state" and I am happy to improve it, but for now I would like >>>> to focus on answering the two questions: >>>> >>>> * Does it look plausible? >>>> * Does it look like it's almost ready to vote on it? (I will update the >>>> AIP before starting voting, of course). >>>> >>>> Let me know what you think. >>>> >>>> J. >>>> >>>> >>>> >>>> >>>> On Tue, Feb 1, 2022 at 3:11 PM Jarek Potiuk <ja...@potiuk.com> wrote: >>>> >>>>> Since we have AIP-43 already approved, I think I would love to have >>>>> more questions and discussions about AIP-44 - The "Airflow Internal API" >>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>>> (as the new name is). >>>>> >>>>> For those who would like to get more context - recording of the >>>>> meeting where both AIP-43 and AIP-44 scope and proposals were discussed >>>>> can >>>>> be found here: >>>>> https://drive.google.com/file/d/1SMFzazuY1kg4B4r11wNt8EQ_PmTDRKq6/view >>>>> >>>>> In the AIP I just made a small clarification regarding some "future" >>>>> changes - specifically the token security might be nicely handled >>>>> together >>>>> with AIP-46 "Add support for docker runtime isolation" that is proposed by >>>>> Ping. >>>>> >>>>> My goal is to gather comments till the end of the week, and if there >>>>> will be no big concerns, I would love to start voting next week. >>>>> >>>>> J. >>>>> >>>>> >>>>> On Mon, Jan 3, 2022 at 2:48 PM Jarek Potiuk <ja...@potiuk.com> wrote: >>>>> >>>>>> Also AIP-44 - which is the DB isolation mode is much more detailed and >>>>>> ready for deeper discussion if needed. >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API >>>>>> >>>>>> On Tue, Dec 14, 2021 at 12:07 PM Jarek Potiuk <ja...@potiuk.com> >>>>>> wrote: >>>>>> > >>>>>> > And just to add to that. >>>>>> > >>>>>> > Thanks again for the initial comments and pushing us to provide more >>>>>> > details. That allowed us to discuss and focus on many of the aspects >>>>>> > that were raised and we have many more answers now: >>>>>> > >>>>>> > * first of all - we focused on making sure impact on the existing >>>>>> code >>>>>> > and "behavior" of Airflow is minimal. In fact there should be >>>>>> > virtually no change vs. current behavior when db isolation is >>>>>> > disabled. >>>>>> > * secondly - we've done a full inventory of how the API needed >>>>>> should >>>>>> > look like and (not unsurprisingly) it turned out that the current >>>>>> > REST-style API is good for part of it but most of the 'logic" of >>>>>> > airflow can be done efficiently when we go to RPC-style API. However >>>>>> > we propose that the authorization/exposure of the API is the same as >>>>>> > we use currently in the REST API, this will allow us to reuse a big >>>>>> > part of the infrastructure we already have >>>>>> > * thirdly - we took deep into our hearts the comments about having >>>>>> to >>>>>> > maintain pretty much the same logic in a few different places. >>>>>> That's >>>>>> > an obvious maintenance problem. The proposal we came up with >>>>>> addresses >>>>>> > it - we are going to keep the logic of Airflow internals in one >>>>>> place >>>>>> > only and we will simply smartly route on where the logic will be >>>>>> > executed >>>>>> > * regarding the performance impact - we described the deployment >>>>>> > options that our proposal makes available - we do not want to favor >>>>>> > one deployment option over another, but we made sure the >>>>>> architecture >>>>>> > is done in the way, that you can choose which deployment is good for >>>>>> > you: "no isolation", "partial isolation", "full isolation" - each >>>>>> with >>>>>> > different performance/resource characteristics - all of them fully >>>>>> > horizontally scalable and nicely manageable. >>>>>> > >>>>>> > We look forward to comments, also for the API 43 - >>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation >>>>>> > - AIP-43 is a prerequiste to AIP-44 and they both work together. >>>>>> > >>>>>> > We also will think and discuss more follow-up AIPs once we get those >>>>>> > approved (hopefully ;) ). The multi-tenancy is a 'long haul" and >>>>>> while >>>>>> > those two AIPs are foundational building blocks, there are at least >>>>>> > few more follow-up AIPs to be able to say "we're done with >>>>>> > Multi-tenancy" :). >>>>>> > >>>>>> > J. >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Tue, Dec 14, 2021 at 9:58 AM Mateusz Henc >>>>>> <mh...@google.com.invalid> wrote: >>>>>> > > >>>>>> > > Hi, >>>>>> > > >>>>>> > > As promised we (credits to Jarek) updated the AIPs, added more >>>>>> details, did inventory and changed the way API endpoints are generated. >>>>>> > > >>>>>> > > We also renamed it to Airflow Internal API - so the url has >>>>>> changed: >>>>>> > > >>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>>>> > > >>>>>> > > Please take a look, any comments are highly appreciated. >>>>>> > > >>>>>> > > Best regards, >>>>>> > > Mateusz Henc >>>>>> > > >>>>>> > > >>>>>> > > On Mon, Dec 6, 2021 at 3:09 PM Mateusz Henc <mh...@google.com> >>>>>> wrote: >>>>>> > >> >>>>>> > >> Hi, >>>>>> > >> Thank you Ash for your feedback (in both AIPs) >>>>>> > >> >>>>>> > >> We are working to address your concerns. We will update the AIPs >>>>>> in a few days. >>>>>> > >> I will let you know when it's done. >>>>>> > >> >>>>>> > >> Best regards, >>>>>> > >> Mateusz Henc >>>>>> > >> >>>>>> > >> >>>>>> > >> On Fri, Dec 3, 2021 at 7:27 PM Jarek Potiuk <ja...@potiuk.com> >>>>>> wrote: >>>>>> > >>> >>>>>> > >>> Cool. Thanks for the guidance. >>>>>> > >>> >>>>>> > >>> On Fri, Dec 3, 2021 at 6:37 PM Ash Berlin-Taylor < >>>>>> a...@apache.org> wrote: >>>>>> > >>> > >>>>>> > >>> > - make an inventory: It doesn't need to be exhaustive, but a >>>>>> representative sample. >>>>>> > >>> > - More clearly define _what_ the API calls return -- object >>>>>> type, methods on them etc. >>>>>> > >>> > >>>>>> > >>> > From the AIP you have this example: >>>>>> > >>> > >>>>>> > >>> > def get_dag_run(self, dag_id, execution_date): >>>>>> > >>> > return self.db_client.get_dag_run(dag_id,execution_date) >>>>>> > >>> > >>>>>> > >>> > What does that _actually_ return? What capabilities does it >>>>>> have? >>>>>> > >>> > >>>>>> > >>> > (I have other thoughts but those are less fundamental and can >>>>>> be discussed later) >>>>>> > >>> > >>>>>> > >>> > -ash >>>>>> > >>> > >>>>>> > >>> > On Fri, Dec 3 2021 at 18:20:21 +0100, Jarek Potiuk < >>>>>> ja...@potiuk.com> wrote: >>>>>> > >>> > >>>>>> > >>> > Surely - if you think that we need to do some more work to >>>>>> get confidence, that's fine. I am sure we can improve it to the level >>>>>> that >>>>>> we will not have to do full performance tests, and you are confident in >>>>>> the >>>>>> direction. Just to clarify your concerns and make sure we are on the same >>>>>> page - as I understand it should: * make an inventory of "actual changes" >>>>>> the proposal will involve in the database-low-level code of Airflow * >>>>>> based >>>>>> on that either assessment that those changes are unlikely (or likely >>>>>> make a >>>>>> performance impact) * if we asses that it is likely to have an impact, >>>>>> some >>>>>> at least rudimentary performance tests to prove that this is manageable I >>>>>> think that might be a good exercise to do. Does it sound about right? Or >>>>>> do >>>>>> you have any concerns about certain architectural decisions taken? No >>>>>> problem with Friday, but if we get answers today. I think it will give us >>>>>> time to think about it over the weekend and address it next week. J. On >>>>>> Fri, Dec 3, 2021 at 5:56 PM Ash Berlin-Taylor <a...@apache.org> wrote: >>>>>> > >>> > >>>>>> > >>> > This is a fundamental change to the architecture with >>>>>> significant possible impacts on performance, and likely requires >>>>>> touching a >>>>>> large portion of the code base. Sorry, you're going to have to do expand >>>>>> on >>>>>> the details first and work out what would actually be involved and what >>>>>> the >>>>>> impacts will be: Right now I have serious reservations to this approach, >>>>>> so >>>>>> I can't agree on the high level proposal without an actual proposal (The >>>>>> current document is, at best, an outline, not an actual proposal.) Sorry >>>>>> to >>>>>> be a grinch right before the weekend. Ash On Thu, Dec 2 2021 at 22:47:34 >>>>>> +0100, Jarek Potiuk <ja...@potiuk.com> wrote: Oh yeah - good point >>>>>> and we spoke about performance testing/implications. Performance is >>>>>> something we were discussing as the next step when we get general "OK" in >>>>>> the direction - we just want to make sure that there are no "huge" >>>>>> blockers >>>>>> in the way this is proposed and explain any doubts first, so that the >>>>>> investment in performance part makes sense. We do not want to spend a lot >>>>>> of time on getting the tests done and detailed inventory of methods/ API >>>>>> calls to get - only to find out that this is generally "bad direction". >>>>>> Just to clarify again - we also considered (alternative option) to >>>>>> automatically map all the DB methods in the remote calls. But we dropped >>>>>> that idea - precisely for the reason of performance, and transaction >>>>>> integrity. So we are NOT mapping DB calls into API calls. those will be >>>>>> "logical operations" on the database. Generally speaking, most of the API >>>>>> calls for the "airflow system-level but executed in worker" calls will be >>>>>> rather "coarse" than fine-grained. For example, the aforementioned "mini >>>>>> scheduler" - where we want to make a single API call and run the whole of >>>>>> it on the DBAPI side. So there - performance impact is very limited IMHO. >>>>>> And If we see any other "logic" like that in other parts of the code >>>>>> (zombie detection as an example). We plan to make a detailed inventory of >>>>>> those once we get general "Looks good" for the direction. For now we did >>>>>> some "rough" checking and it seems a plausible approach and quite doable. >>>>>> One more note - the "fine-grained" ( "variable" update/retrieval, >>>>>> "connection update retrieval") - via REST API will still be used by the >>>>>> user's code though (Parsing DAGs, operators, workers and callbacks). We >>>>>> also plan to make sure that none of the "Community" operators are using >>>>>> "non-blessed" DB calls (we can do it in our CI). So at the end of the >>>>>> exercise, all operators, hooks, etc. from the community will be >>>>>> guaranteed >>>>>> to only use the DB APIs that are available in the "DB API" module. But >>>>>> there I do not expect pretty much any performance penalty as those are >>>>>> very >>>>>> fast and rare operations (and good thing there is that we can cache >>>>>> results >>>>>> of those in workers/DAG processing). J. On Thu, Dec 2, 2021 at 7:16 PM >>>>>> Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: Ah, my >>>>>> bad, I missed that. I'd still like to see discussion of the performance >>>>>> impacts, though. On Thu, Dec 2, 2021 at 11:14 AM Ash Berlin-Taylor < >>>>>> a...@apache.org> wrote: The scheduler was excluded from the >>>>>> components that would use the dbapi - the mini scheduler is the odd one >>>>>> out >>>>>> here is it (currently) runs on the work but shares much of the code from >>>>>> the scheduling path. -a On 2 December 2021 17:56:40 GMT, Andrew Godwin < >>>>>> andrew.god...@astronomer.io.INVALID> wrote: I would also like to see >>>>>> some discussion in this AIP about how the data is going to be serialised >>>>>> to >>>>>> and from the database instances (obviously Connexion is involved, but I >>>>>> presume more transformation code is needed than that) and the potential >>>>>> slowdown this would cause. In my experience, a somewhat direct ORM >>>>>> mapping >>>>>> like this is going to result in considerably slower times for any complex >>>>>> operation that's touching a few hundred rows. Is there a reason this is >>>>>> being proposed for the scheduler code, too? In my mind, the best approach >>>>>> to multitenancy would be to remove all user-supplied code from the >>>>>> scheduler and leave it with direct DB access, rather than trying to >>>>>> indirect all scheduler access through another API layer. Andrew On Thu, >>>>>> Dec >>>>>> 2, 2021 at 10:29 AM Jarek Potiuk <ja...@potiuk.com> wrote: Yeah - I >>>>>> thik Ash you are completely right we need some more "detailed" >>>>>> clarification. I believe, I know what you are - rightfully - afraid of >>>>>> (re >>>>>> impact on the code), and maybe we have not done a good job on explaining >>>>>> it >>>>>> with some of our assumptions we had when we worked on it with Mateusz. >>>>>> Simply it was not clear that our aim is to absolutely minimise the impact >>>>>> on the "internal DB transactions" done in schedulers and workers. The >>>>>> idea >>>>>> is that change will at most result in moving an execution of the >>>>>> transactions to another process but not changing what the DB transactions >>>>>> do internally. Actually this was one of the reason for the "alternative" >>>>>> approach (you can see it in the document) we discussed about - hijack >>>>>> "sqlalchemy session" - this is far too low level and the aim of the >>>>>> "DB-API" is NOT to replace direct DB calls (Hence we need to figure out a >>>>>> better name). The API is there to provide "scheduler logic" API and "REST >>>>>> access to Airflow primitives like dags/tasks/variables/connections" etc.. >>>>>> As an example (which we briefly talked about in slack) the >>>>>> "_run_mini_scheduler_on_child_tasks" case ( >>>>>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274) >>>>>> is an example (that we would put in the doc). As we thought of it - this >>>>>> is >>>>>> a "single DB-API operation". Those are not Pure REST calls of course, >>>>>> they >>>>>> are more RPC-like calls. That is why even initially I thought of >>>>>> separating >>>>>> the API completely. But since there are a lot of common "primitive" calls >>>>>> that we can re-use, I think having a separate DB-API component which will >>>>>> re-use connexion implementation, replacing authentication with the custom >>>>>> worker <> DB-API authentication is the way to go. And yes if we agree on >>>>>> the general idea, we need to choose the best way on how to best "connect" >>>>>> the REST API we have with the RPC-kind of API we need for some cases in >>>>>> workers. But we wanted to make sure we are on the same page with the >>>>>> direction. And yes it means that DB-API will potentially have to handle >>>>>> quite a number of DB operations (and that it has to be replicable and >>>>>> scalable as well) - but DB-API will be "stateless" similarly as the >>>>>> webserver is, so it will be scalable by definition. And yest performance >>>>>> tests will be part of POC - likely even before we finally ask for votes >>>>>> there. So in short: * no modification or impact on current scheduler >>>>>> behaviour when DB Isolation is disabled * only higher level methods will >>>>>> be >>>>>> moved out to DB-API and we will reuse existing "REST" APIS where it makes >>>>>> sense * we aim to have "0" changes to the logic of processing - both in >>>>>> Dag >>>>>> Processing logic and DB API. We think with this architecture we proposed >>>>>> it's perfectly doable I hope this clarifies a bit, and once we agree on >>>>>> general direction, we will definitely work on adding more details and >>>>>> clarification (we actually already have a lot of that but we just wanted >>>>>> to >>>>>> start with explaining the idea and going into more details later when we >>>>>> are sure there are no "high-level" blockers from the community. J, On >>>>>> Thu, >>>>>> Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote: > > >>>>>> I just provided a general idea for the approach - but if you want me to >>>>>> put >>>>>> more examples then I am happy to do that > > > Yes please. > > It is too >>>>>> general for me and I can't work out what effect it would actually have on >>>>>> the code base, especially how it would look with the config option to >>>>>> enable/disable direct db access. > > -ash > > On Thu, Dec 2 2021 at >>>>>> 16:36:57 +0100, Mateusz Henc <mh...@google.com.INVALID> wrote: > > >>>>>> Hi, > I am sorry if it is not clear enough, let me try to explain it >>>>>> here, >>>>>> so maybe it gives more light on the idea. > See my comments below > > On >>>>>> Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org> >>>>>> wrote: >> >> I'm sorry to say it, but this proposal right just doesn't >>>>>> contain enough detail to say what the actual changes to the code would >>>>>> be, >>>>>> and what the impact would be >> >> To take the one example you have so >>>>>> far: >>>>>> >> >> >> def get_dag_run(self, dag_id, execution_date): >> return >>>>>> self.db_client.get_dag_run(dag_id,execution_date) >> >> So form this >>>>>> snippet I'm guessing it would be used like this: >> >> dag_run = >>>>>> db_client.get_dag_run(dag_id, execution_date) >> >> What type of object >>>>>> is >>>>>> returned? > > > As it replaces: > dag_run = session.query(DagRun) > >>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>>>>> execution_date) > >>>>>> .first() > > then the type of the object will be exactly the same >>>>>> (DagRun) >>>>>> . > >> >> >> Do we need one API method per individual query we have in >>>>>> the >>>>>> source? > > > No, as explained by the sentence: > > The method may be >>>>>> extended, accepting more optional parameters to avoid having too many >>>>>> similar implementations. > > >> >> >> Which components would use this new >>>>>> mode when it's enabled? > > > You may read: > Airflow Database APi is a >>>>>> new >>>>>> independent component of Airflow. It allows isolating some components >>>>>> (Worker, DagProcessor and Triggerer) from direct access to DB. > >> >> >>>>>> But >>>>>> what you haven't said the first thing about is what _other_ changes would >>>>>> be needed in the code. To take a fairly simple example: >> >> dag_run = >>>>>> db_client.get_dag_run(dag_id, execution_date) >> dag_run.queued_at = >>>>>> timezone.now() >> # How do I save this? >> >> In short, you need to put a >>>>>> lot more detail into this before we can even have an idea of the full >>>>>> scope >>>>>> of the change this proposal would involve, and what code changes would be >>>>>> needed for compnents to work with and without this setting enabled. > > > >>>>>> For this particular example - it depends on the intention of the code >>>>>> author > - If this should be in transaction - then I would actually >>>>>> introduce new method like enqueue_dag_run(...) that would run these two >>>>>> steps on Airflow DB API side > - if not then, maybe just the >>>>>> "update_dag_run" method accepting the whole "dag_run" object and saving >>>>>> it >>>>>> to the DB. > > In general - we could take naive approach, eg replace >>>>>> code: >>>>>> > dag_run = session.query(DagRun) > .filter(DagRun.dag_id == dag_id, >>>>>> DagRun.execution_date == execution_date) > .first() > with: > if >>>>>> self.db_isolation: > dag_run = session.query(DagRun) > >>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>>>>> execution_date) > >>>>>> .first() > else: > dag_run = db_client.get_dag_run(self, dag_id, >>>>>> execution_date) > > The problem is that Airflow DB API would need to have >>>>>> the same implementation for the query - so duplicated code. That's why we >>>>>> propose moving this code to the DBClient which is also used by the >>>>>> Airflow >>>>>> DB API(in DB direct mode). > > I know there are many places where the >>>>>> code >>>>>> is much more complicated than a single query, but they must be handled >>>>>> one-by-one, during the implementation, otherwise this AIP would be way >>>>>> too >>>>>> big. > > I just provided a general idea for the approach - but if you >>>>>> want >>>>>> me to put more examples then I am happy to do that > > Best regards, > >>>>>> Mateusz Henc > >> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc >>>>>> <mh...@google.com.INVALID> wrote: >> >> Hi, >> I just added a new >>>>>> AIP for running some Airflow components in DB-isolation mode, without >>>>>> direct access to the Airflow Database, but they will use a new API for >>>>>> thi >>>>>> purpose. >> >> PTAL: >> >>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API >>>>>> >> >> Open question: >> I called it "Airflow Database API" - however I >>>>>> >> >> feel >>>>>> it could be more than just an access layer for the database. So if you >>>>>> have >>>>>> a better name, please let me know, I am happy to change it. >> >> Best >>>>>> regards, >> Mateusz Henc >>>>>> >>>>>