Hi Jarek, Thanks for the thorough explanation. They all make sense, especially the performance impact and scalability part. I believe with the internal API, it should reduce lots of db connections.
Looking forward to it! Thanks, Ping On Thu, Aug 4, 2022 at 12:11 AM Jarek Potiuk <ja...@potiuk.com> wrote: > I would like to see more about how you do this because I have noticed that > there are tons of different configurations in the airflow and some > combinations are less covered by the test as there are some default values, > for example this code > <https://github.com/apache/airflow/blob/3c08cefdfd2e2636a714bb835902f0cb34225563/airflow/task/task_runner/standard_task_runner.py#L39-L42> > . > > > Good point. I plan to have another "test type" added in our CI and run > the full test suite there. I am not planning to add new tests (well there > will be some generic DB-less mode tests for sure but I am mostly going to > rely on the existing test harness for Airflow). The change will be that I > will just block all direct DB access from any methods that will be executed > via any of the "airflow" main code. This should be rather easy - since we > are using SQLAlchemy, we will be able to capture all DB requests and see if > they are coming from "Airflow" or from "Tests" (looking at call stack > trace). This should allow us to have a "DB isolation" mode where (vast) > majority of the current tests will be able to run without any modifications > but we will fail if any of the "airflow" code accesses the DB directly. > Likely some tests will need to be adapted or not run in the DB Isolation > mode, but this should be easy as well. > > This way we will be able to not only to fix any "unwanted" DB access > currently (for example if any of the provider code accesses DB directly we > will see tests failing). This will also be future-compatible, i.e. new > tests or changes in airflow will also have to pass the tests so there is > very little overhead needed in case of future airflow code. We are not > making anything but possibly minor modifications to the internal Airflow > structure or methods/class structure and except adding the need to also > keep .proto definitions for the Internal APIs (maybe 30-40 methods in > total) there is no extra overhead involved to develop new features or > modify the existing ones. > > > Also, it will be great if your AIP can discuss the long term strategy > about the internal API. Personally, I think we should completely remove the > db session access from the worker and other untrusted components and fully > rely on the Internal API. This will reduce the maintenance burden as if we > add new db access in those untrusted components, we need to do it for both > db session and internal API. What's more, with the Internal API, we can > even remove the db-proxy :) > > Maybe that was not clear enough but this is precisely what AIP-44 is > about! We are going to remove ALL Direct DB acess from those untrusted > components. ALL of it. Full stop. The (not even long term) goal is that > when AIP-44 is implemented, untrusted components will not need Metadata DB > access at all. We are going to fully rely on the Internal API :). I think > the way I proposed it, with the help of a few interested people, we should > be able to get there in Airflow 2.5 (looking at the inventory of methods). > I will need to update the inventory after 2.4 is out with the dataset > changes but I do not expect particular increase in the scope. The steps > that need to be done to complete it: > > * Implement the foundation -> I plan to start working on immediately > after approval): complete the POC and implement the test harness for our CI > (should be rather easy ) > * Redo the inventory and prepare list of methods most of the changes is > to implement ".proto" for all the method to replace -> right after we cut > 2.4 branch > * Replace the methods and make sure that all the tests pass -> once we > have foundation in place this is easy to be done by a number of people, > similarly as we do AIP-47 where we have a number of people implementing > parts of it. > * overall testing including performance impact and scalability (which I > had initially agreed that Google Composer Team could help a lot for "scale" > testing). > > If I have support from the community members to help with the last point, > I think we should be rather safe to get it implemented for 2.5 (or 2.6 if > the final testing will reveal some optimisations needs). > > This will all be under feature-flag and the idea is that without the flag > enabled there should be nearly 0 impact on the existing functionality > (there will be very slight overhead to make an extra method call when any > DB method starts) - but since those are "DB" methods, it will be totally > neglectible.. > > J. > > > On Thu, Aug 4, 2022 at 12:47 AM Ping Zhang <pin...@umich.edu> wrote: > >> >> Hi Jarek, >> >> Thanks for pushing forward with this AIP. >> >> I am very interesting in this part: >> >> All tests for DagProcessor, Workers (including Operators) and triggers >> are executed in both modes "DBDirect'' and "DBIsolation ''. The code for >> Airflow Database API, the DbApiClient and DBSession are extended until all >> tests pass. >> >> I would like to see more about how you do this because I have noticed >> that there are tons of different configurations in the airflow and some >> combinations are less covered by the test as there are some default values, >> for example this code >> <https://github.com/apache/airflow/blob/3c08cefdfd2e2636a714bb835902f0cb34225563/airflow/task/task_runner/standard_task_runner.py#L39-L42> >> . >> >> Also, it will be great if your AIP can discuss the long term strategy >> about the internal API. Personally, I think we should completely remove the >> db session access from the worker and other untrusted components and fully >> rely on the Internal API. This will reduce the maintenance burden as if we >> add new db access in those untrusted components, we need to do it for both >> db session and internal API. What's more, with the Internal API, we can >> even remove the db-proxy :) >> >> Thanks, >> >> Ping >> >> >> On Tue, Aug 2, 2022 at 6:56 AM Jarek Potiuk <ja...@potiuk.com> wrote: >> >>> Hello, >>> >>> I have updated AIP-44 with the findings from the POC. If there are any >>> more questions/comments, please let me know, otherwise I plan to start >>> voting tomorrow. >>> >>> >>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>> >>> On Mon, Jul 25, 2022 at 10:18 PM Jarek Potiuk <ja...@potiuk.com> wrote: >>> >>>> I think the drop will be less than those numbers - especially the >>>> high-end ones. The DB operations we use are quite "coarse" grained (they >>>> span a single transaction, not a function call). Also I deliberately chose >>>> "example dags" as the benchmark playground - the example DAGs we have are >>>> small. And in this case it means that the "actual" work done is rather >>>> small. If you start parsing bigger DAGs, the overall number of calls (thus >>>> RPC overhead) will not grow, but the time needed to parse the DAG will grow >>>> a lot. This means that the RPC overhead will get way smaller >>>> (percentage-wise). >>>> >>>> Also this is not an "overall" performance drop - this is really a >>>> "mini-benchmark" (as opposed to "system benchmark" and "micro-benchmark"). >>>> The benchmark only concerns running particular "transactions" - but all >>>> the other parts remain unaffected - so the numbers I saw are the absolute, >>>> maximum limits of the performance drop and they will be much smaller in >>>> reality. >>>> >>>> J. >>>> >>>> >>>> On Mon, Jul 25, 2022 at 9:01 PM Mateusz Henc <mh...@google.com.invalid> >>>> wrote: >>>> >>>>> Hi, >>>>> Sorry, but I was OOO and offline last week and I was not able to even >>>>> rely. >>>>> >>>>> Thank you Jarek for a very detailed analysis. I think we all expected >>>>> some performance drop for better security. >>>>> Do you think the performance drop that you measured is what we should >>>>> expect in the "final" version? I don't know Airflow that much to know >>>>> which >>>>> DB- access is used most (and what object types are required there - >>>>> small/big or maybe just primitives?). >>>>> >>>>> Nonetheless, I am looking forward to starting the voting process! >>>>> >>>>> Best regards, >>>>> Mateusz Henc >>>>> >>>>> >>>>> On Fri, Jul 15, 2022 at 4:48 PM Jarek Potiuk <ja...@potiuk.com> wrote: >>>>> >>>>>> Two more things: >>>>>> 1) there is one "caveat" I had to handle - timeout handling in >>>>>> DagFileProcessor (signals do not play well with threads of GRPC - but >>>>>> that >>>>>> should be easy to fix) >>>>>> 2) The way I proposed it (with very localized changes needed) will >>>>>> be very easy to split the job among multiple people and make it a true >>>>>> community effort to complete - no need for major refactorings or changes >>>>>> across the whole airflow code. >>>>>> >>>>>> J. >>>>>> >>>>>> On Fri, Jul 15, 2022 at 4:32 PM Jarek Potiuk <ja...@potiuk.com> >>>>>> wrote: >>>>>> >>>>>>> Hello Everyone, >>>>>>> >>>>>>> First of all - apologies for those who waited for it, I've been >>>>>>> dragged in multiple directions, but finally I got some quality time to >>>>>>> take >>>>>>> a look at open questions and implement POC for AIP-44 as promised >>>>>>> before. >>>>>>> >>>>>>> TL;DR; I finally came back to the AIP-44 and multi-tenancy and have >>>>>>> made good progress that hopefully will lead to voting next week. I >>>>>>> think I >>>>>>> have enough of the missing evidence of the impact of the internal API on >>>>>>> performance, also I have implemented working POC with one of the >>>>>>> Internal >>>>>>> API calls (processFile) that we will have to implement and run a series >>>>>>> of >>>>>>> performance tests with it. >>>>>>> >>>>>>> # Current state >>>>>>> -------------------- >>>>>>> >>>>>>> The state we left it few months ago was ( >>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>>>>> ): >>>>>>> * I've prepared inventory of methods and general approach we are >>>>>>> going to take (and we got consensus there) >>>>>>> * I've left decision on the final choice (Thrift vs. GRPC) to later >>>>>>> POC (I got some feedback from Beam about gRPC vs. Thrift, based on that >>>>>>> I >>>>>>> started with GRPC and I am actually very happy with it, so i think we >>>>>>> can >>>>>>> leave Thrift out of the picture). >>>>>>> * We've struggled a bit with the decision - should we leave two >>>>>>> paths (with GRPC/Direct DB) or one (local GRPC vs. remote GRPC) >>>>>>> >>>>>>> The POC is implemented here: >>>>>>> https://github.com/apache/airflow/pull/25094 and I have the >>>>>>> following findings: >>>>>>> >>>>>>> # Performance impact >>>>>>> ------------------------------ >>>>>>> >>>>>>> The performance impact is visible (as expected). It's quite >>>>>>> acceptable for a distributed environment (in exchange for security), but >>>>>>> it's likely significant enough to stay with the original idea of having >>>>>>> Airflow work in two modes: a) secure - with RPC apis, b) standard - with >>>>>>> direct DB calls. >>>>>>> >>>>>>> On "localhost" with a local DB the performance overhead for >>>>>>> serializing and transporting the messages between two different >>>>>>> processes >>>>>>> introduced up to 10% overhead. I saw an increase of time to run 500 >>>>>>> scans >>>>>>> of all our example folders dags going from ~290 to ~320s pretty >>>>>>> consistently. Enough of a difference to exclude volatility. I tested it >>>>>>> in >>>>>>> Docker on both ARM and AMD. It seems that in some cases that can be >>>>>>> partially offset by slightly increased parallelism on multi-processing >>>>>>> machines run on "bare" metal. I got consistently just 2-3% increase on >>>>>>> my >>>>>>> linux without docker with the same test harness, but we cannot rely on >>>>>>> such >>>>>>> configurations, I think focusing on Docker-based installation without >>>>>>> any >>>>>>> special assumptions on how your networking/localhost is implemented is >>>>>>> something we should treat as a baseline. >>>>>>> >>>>>>> The same experiment repeated with 50 messages but of much bigger >>>>>>> size (300 Callbacks passed per message) have shown 30% performance drop. >>>>>>> This is significant, but I believe most of our messages will be much >>>>>>> smaller. Also the messages I chose were very special - because in case >>>>>>> of >>>>>>> Callback we are doing rather sophisticated serialization, because we >>>>>>> have >>>>>>> to account for Kubernetes objects that are potentially not >>>>>>> serializable, so >>>>>>> the test involved 300 messages that had to be not only GRPC serialized >>>>>>> but >>>>>>> also some parts of them had to be Airflow-JSON serialized and the whole >>>>>>> "executor config" had to be picked/unpickled (for 100 such messages out >>>>>>> of >>>>>>> 300). Also we have some conditional processing there (there is a union >>>>>>> of >>>>>>> three types of callbacks that has to be properly deserialized). And >>>>>>> those >>>>>>> messages could be repeated. This is not happening for most other cases. >>>>>>> And >>>>>>> we can likely optimize it away in the future. but I wanted to see the >>>>>>> "worst" scenario. >>>>>>> >>>>>>> For the remote client (still local DB for the internal API server) I >>>>>>> got between 30% and 80% slow-down, but this was on my WiFi network. I am >>>>>>> sure with a "wired" network, it will be much better, also when a remote >>>>>>> DB >>>>>>> gets into picture the overall percentage overhead will be much smaller. >>>>>>> We >>>>>>> knew this would be a "slower" solution but this is the price for someone >>>>>>> who wants to have security and isolation. >>>>>>> >>>>>>> I have not yet performed the tests with SSL, but I think this will >>>>>>> be a modest increase. And anyhow the "impact" of 10% is IMHO enough to >>>>>>> make >>>>>>> a decision that we cannot get "GRPC-only" - the path where we will >>>>>>> continue >>>>>>> using Direct DB access should stay (and I know already how to do it >>>>>>> relatively easily). >>>>>>> >>>>>>> # Implementation and maintainability >>>>>>> ------------------------------------------------- >>>>>>> >>>>>>> In the PR you will see the way I see implementation details and how >>>>>>> it will impact our code in general. I think what I proposed is actually >>>>>>> rather elegant and easy to maintain, and likely we can improve it >>>>>>> somehow >>>>>>> (happy to brainstorm on some creative ways we could use - for example - >>>>>>> decorators) to make it "friendler" but I focused more on explicitness >>>>>>> and >>>>>>> showing the mechanisms involved rather than "fanciness". I found it >>>>>>> rather >>>>>>> easy to implement and it does seem to have some good "easy >>>>>>> maintainability" >>>>>>> properties. The way I propose it boils down to few "rules": >>>>>>> >>>>>>> * For all the data and "Messages" we sent, we have a nice "Internal >>>>>>> API" defined in .proto and protobuf objects generated out of that. The >>>>>>> GRPC >>>>>>> proto nicely defines the structures we are going to send over the >>>>>>> network. >>>>>>> It's very standard, it has a really good modern support. For one, we >>>>>>> automatically - I added pre-commit - generate MyPY type stubs for the >>>>>>> generated classes and it makes it super easy to both - implement the >>>>>>> mapping and automatically verify its correctness. Mypy nicely catches >>>>>>> all >>>>>>> kinds of mistakes you can make)! Autocomplete for the PROTO-generated >>>>>>> classes works like a charm. I struggled initially without typing, but >>>>>>> once >>>>>>> I configured mypy stub generation, I got really nice detection of >>>>>>> mistakes >>>>>>> I've made and I was able to progress with the implementation way faster >>>>>>> than without it. Usually everything magically worked as soon as I fixed >>>>>>> all >>>>>>> MyPy errors. >>>>>>> >>>>>>> * All Airflow objects that we get to send over GRPC should get >>>>>>> from_protobuf/to_protobuf methods. They are usually very simple (just >>>>>>> passing strings/ints/boolean fields to the constructor), and the >>>>>>> structures >>>>>>> we pass are rather small (see the PR). Also (as mentioned above) I >>>>>>> implemented a few more complex cases (with more complex serialization) >>>>>>> and >>>>>>> it is easy, readable and pretty well integrated into our code IMHO. This >>>>>>> introduces a little duplication here and there, but those objects change >>>>>>> rarely (only when we implement big features like Dynamic Task mapping) >>>>>>> and >>>>>>> MyPy guards us against any mishaps there (now that our code is mostly >>>>>>> typed). >>>>>>> >>>>>>> * We make all "DB Aware" objects also "GRPC aware". The number of >>>>>>> changes in the actual code/logic is rather small and makes it super >>>>>>> easy to >>>>>>> maintain IMHO. There are two changes: >>>>>>> >>>>>>> 1) for any of the objects that are used for database operations >>>>>>> (in my PR this is DagFileProcessor) we need to initialize it with the >>>>>>> "use_grpc" flag and pass it a channel that will be used for >>>>>>> communication. >>>>>>> 2) the DB methods we have (inventory in the AIP) will have to be >>>>>>> refactored slightly. This is what really was added, the original >>>>>>> "process_file" method was renamed to "process_file_db" and the >>>>>>> "callers" of >>>>>>> the method are completely intact. >>>>>>> >>>>>>> def process_file_grpc( >>>>>>> self, >>>>>>> file_path: str, >>>>>>> callback_requests: List[CallbackRequest], >>>>>>> pickle_dags: bool = False, >>>>>>> ) -> Tuple[int, int]: >>>>>>> request = >>>>>>> internal_api_pb2.FileProcessorRequest(path=file_path, >>>>>>> pickle_dags=pickle_dags) >>>>>>> for callback_request in callback_requests: >>>>>>> request.callbacks.append(callback_request.to_protobuf()) >>>>>>> res = self.stub.processFile(request) >>>>>>> return res.dagsFound, res.errorsFound >>>>>>> >>>>>>> def process_file( >>>>>>> self, >>>>>>> file_path: str, >>>>>>> callback_requests: List[CallbackRequest], >>>>>>> pickle_dags: bool = False, >>>>>>> ) -> Tuple[int, int]: >>>>>>> if self.use_grpc: >>>>>>> return self.process_file_grpc( >>>>>>> file_path=file_path, >>>>>>> callback_requests=callback_requests, pickle_dags=pickle_dags >>>>>>> ) >>>>>>> return self.process_file_db( >>>>>>> file_path=file_path, >>>>>>> callback_requests=callback_requests, pickle_dags=pickle_dags >>>>>>> ) >>>>>>> >>>>>>> You can see all the details in the PR >>>>>>> https://github.com/apache/airflow/pull/25094. >>>>>>> >>>>>>> # POC testing harness (self-service :) ) >>>>>>> ---------------------------------------------------- >>>>>>> >>>>>>> You can also very easily test it yourself: >>>>>>> >>>>>>> * airflow internal-api server -> runs internal API server. >>>>>>> * airflow internal-api test-client --num-repeats 10 --num-callbacks >>>>>>> 10 --use-grpc -> runs file-processing of all example dags 10 times with >>>>>>> 10x3 callbacks sent. Same command without --use-grpc will run using >>>>>>> direct >>>>>>> DB access. Server listens on "50051" port, client connects to >>>>>>> "localhost:50051" - you can modify the code to use remote IP/different >>>>>>> ports (easy to find by localhost:50051). >>>>>>> >>>>>>> # Discussion and voting >>>>>>> -------------------------------- >>>>>>> >>>>>>> Of course some decisions in the PR can be improved (I focused on >>>>>>> explicitness of the POC more than anything else). We can discuss some >>>>>>> changes and improvements to some decisions I made when the PR will be in >>>>>>> "reviewable state" and I am happy to improve it, but for now I would >>>>>>> like >>>>>>> to focus on answering the two questions: >>>>>>> >>>>>>> * Does it look plausible? >>>>>>> * Does it look like it's almost ready to vote on it? (I will update >>>>>>> the AIP before starting voting, of course). >>>>>>> >>>>>>> Let me know what you think. >>>>>>> >>>>>>> J. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 1, 2022 at 3:11 PM Jarek Potiuk <ja...@potiuk.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Since we have AIP-43 already approved, I think I would love to have >>>>>>>> more questions and discussions about AIP-44 - The "Airflow Internal >>>>>>>> API" >>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>>>>>> (as the new name is). >>>>>>>> >>>>>>>> For those who would like to get more context - recording of the >>>>>>>> meeting where both AIP-43 and AIP-44 scope and proposals were >>>>>>>> discussed can >>>>>>>> be found here: >>>>>>>> https://drive.google.com/file/d/1SMFzazuY1kg4B4r11wNt8EQ_PmTDRKq6/view >>>>>>>> >>>>>>>> In the AIP I just made a small clarification regarding some >>>>>>>> "future" changes - specifically the token security might be nicely >>>>>>>> handled >>>>>>>> together with AIP-46 "Add support for docker runtime isolation" that is >>>>>>>> proposed by Ping. >>>>>>>> >>>>>>>> My goal is to gather comments till the end of the week, and if >>>>>>>> there will be no big concerns, I would love to start voting next week. >>>>>>>> >>>>>>>> J. >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Jan 3, 2022 at 2:48 PM Jarek Potiuk <ja...@potiuk.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Also AIP-44 - which is the DB isolation mode is much more detailed >>>>>>>>> and >>>>>>>>> ready for deeper discussion if needed. >>>>>>>>> >>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API >>>>>>>>> >>>>>>>>> On Tue, Dec 14, 2021 at 12:07 PM Jarek Potiuk <ja...@potiuk.com> >>>>>>>>> wrote: >>>>>>>>> > >>>>>>>>> > And just to add to that. >>>>>>>>> > >>>>>>>>> > Thanks again for the initial comments and pushing us to provide >>>>>>>>> more >>>>>>>>> > details. That allowed us to discuss and focus on many of the >>>>>>>>> aspects >>>>>>>>> > that were raised and we have many more answers now: >>>>>>>>> > >>>>>>>>> > * first of all - we focused on making sure impact on the >>>>>>>>> existing code >>>>>>>>> > and "behavior" of Airflow is minimal. In fact there should be >>>>>>>>> > virtually no change vs. current behavior when db isolation is >>>>>>>>> > disabled. >>>>>>>>> > * secondly - we've done a full inventory of how the API needed >>>>>>>>> should >>>>>>>>> > look like and (not unsurprisingly) it turned out that the current >>>>>>>>> > REST-style API is good for part of it but most of the 'logic" of >>>>>>>>> > airflow can be done efficiently when we go to RPC-style API. >>>>>>>>> However >>>>>>>>> > we propose that the authorization/exposure of the API is the >>>>>>>>> same as >>>>>>>>> > we use currently in the REST API, this will allow us to reuse a >>>>>>>>> big >>>>>>>>> > part of the infrastructure we already have >>>>>>>>> > * thirdly - we took deep into our hearts the comments about >>>>>>>>> having to >>>>>>>>> > maintain pretty much the same logic in a few different places. >>>>>>>>> That's >>>>>>>>> > an obvious maintenance problem. The proposal we came up with >>>>>>>>> addresses >>>>>>>>> > it - we are going to keep the logic of Airflow internals in one >>>>>>>>> place >>>>>>>>> > only and we will simply smartly route on where the logic will be >>>>>>>>> > executed >>>>>>>>> > * regarding the performance impact - we described the deployment >>>>>>>>> > options that our proposal makes available - we do not want to >>>>>>>>> favor >>>>>>>>> > one deployment option over another, but we made sure the >>>>>>>>> architecture >>>>>>>>> > is done in the way, that you can choose which deployment is good >>>>>>>>> for >>>>>>>>> > you: "no isolation", "partial isolation", "full isolation" - >>>>>>>>> each with >>>>>>>>> > different performance/resource characteristics - all of them >>>>>>>>> fully >>>>>>>>> > horizontally scalable and nicely manageable. >>>>>>>>> > >>>>>>>>> > We look forward to comments, also for the API 43 - >>>>>>>>> > >>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation >>>>>>>>> > - AIP-43 is a prerequiste to AIP-44 and they both work together. >>>>>>>>> > >>>>>>>>> > We also will think and discuss more follow-up AIPs once we get >>>>>>>>> those >>>>>>>>> > approved (hopefully ;) ). The multi-tenancy is a 'long haul" and >>>>>>>>> while >>>>>>>>> > those two AIPs are foundational building blocks, there are at >>>>>>>>> least >>>>>>>>> > few more follow-up AIPs to be able to say "we're done with >>>>>>>>> > Multi-tenancy" :). >>>>>>>>> > >>>>>>>>> > J. >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > On Tue, Dec 14, 2021 at 9:58 AM Mateusz Henc >>>>>>>>> <mh...@google.com.invalid> wrote: >>>>>>>>> > > >>>>>>>>> > > Hi, >>>>>>>>> > > >>>>>>>>> > > As promised we (credits to Jarek) updated the AIPs, added more >>>>>>>>> details, did inventory and changed the way API endpoints are >>>>>>>>> generated. >>>>>>>>> > > >>>>>>>>> > > We also renamed it to Airflow Internal API - so the url has >>>>>>>>> changed: >>>>>>>>> > > >>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>>>>>>> > > >>>>>>>>> > > Please take a look, any comments are highly appreciated. >>>>>>>>> > > >>>>>>>>> > > Best regards, >>>>>>>>> > > Mateusz Henc >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > On Mon, Dec 6, 2021 at 3:09 PM Mateusz Henc <mh...@google.com> >>>>>>>>> wrote: >>>>>>>>> > >> >>>>>>>>> > >> Hi, >>>>>>>>> > >> Thank you Ash for your feedback (in both AIPs) >>>>>>>>> > >> >>>>>>>>> > >> We are working to address your concerns. We will update the >>>>>>>>> AIPs in a few days. >>>>>>>>> > >> I will let you know when it's done. >>>>>>>>> > >> >>>>>>>>> > >> Best regards, >>>>>>>>> > >> Mateusz Henc >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> > >> On Fri, Dec 3, 2021 at 7:27 PM Jarek Potiuk <ja...@potiuk.com> >>>>>>>>> wrote: >>>>>>>>> > >>> >>>>>>>>> > >>> Cool. Thanks for the guidance. >>>>>>>>> > >>> >>>>>>>>> > >>> On Fri, Dec 3, 2021 at 6:37 PM Ash Berlin-Taylor < >>>>>>>>> a...@apache.org> wrote: >>>>>>>>> > >>> > >>>>>>>>> > >>> > - make an inventory: It doesn't need to be exhaustive, but >>>>>>>>> a representative sample. >>>>>>>>> > >>> > - More clearly define _what_ the API calls return -- >>>>>>>>> object type, methods on them etc. >>>>>>>>> > >>> > >>>>>>>>> > >>> > From the AIP you have this example: >>>>>>>>> > >>> > >>>>>>>>> > >>> > def get_dag_run(self, dag_id, execution_date): >>>>>>>>> > >>> > return >>>>>>>>> self.db_client.get_dag_run(dag_id,execution_date) >>>>>>>>> > >>> > >>>>>>>>> > >>> > What does that _actually_ return? What capabilities does >>>>>>>>> it have? >>>>>>>>> > >>> > >>>>>>>>> > >>> > (I have other thoughts but those are less fundamental and >>>>>>>>> can be discussed later) >>>>>>>>> > >>> > >>>>>>>>> > >>> > -ash >>>>>>>>> > >>> > >>>>>>>>> > >>> > On Fri, Dec 3 2021 at 18:20:21 +0100, Jarek Potiuk < >>>>>>>>> ja...@potiuk.com> wrote: >>>>>>>>> > >>> > >>>>>>>>> > >>> > Surely - if you think that we need to do some more work to >>>>>>>>> get confidence, that's fine. I am sure we can improve it to the level >>>>>>>>> that >>>>>>>>> we will not have to do full performance tests, and you are confident >>>>>>>>> in the >>>>>>>>> direction. Just to clarify your concerns and make sure we are on the >>>>>>>>> same >>>>>>>>> page - as I understand it should: * make an inventory of "actual >>>>>>>>> changes" >>>>>>>>> the proposal will involve in the database-low-level code of Airflow * >>>>>>>>> based >>>>>>>>> on that either assessment that those changes are unlikely (or likely >>>>>>>>> make a >>>>>>>>> performance impact) * if we asses that it is likely to have an >>>>>>>>> impact, some >>>>>>>>> at least rudimentary performance tests to prove that this is >>>>>>>>> manageable I >>>>>>>>> think that might be a good exercise to do. Does it sound about right? >>>>>>>>> Or do >>>>>>>>> you have any concerns about certain architectural decisions taken? No >>>>>>>>> problem with Friday, but if we get answers today. I think it will >>>>>>>>> give us >>>>>>>>> time to think about it over the weekend and address it next week. J. >>>>>>>>> On >>>>>>>>> Fri, Dec 3, 2021 at 5:56 PM Ash Berlin-Taylor <a...@apache.org> >>>>>>>>> wrote: >>>>>>>>> > >>> > >>>>>>>>> > >>> > This is a fundamental change to the architecture with >>>>>>>>> significant possible impacts on performance, and likely requires >>>>>>>>> touching a >>>>>>>>> large portion of the code base. Sorry, you're going to have to do >>>>>>>>> expand on >>>>>>>>> the details first and work out what would actually be involved and >>>>>>>>> what the >>>>>>>>> impacts will be: Right now I have serious reservations to this >>>>>>>>> approach, so >>>>>>>>> I can't agree on the high level proposal without an actual proposal >>>>>>>>> (The >>>>>>>>> current document is, at best, an outline, not an actual proposal.) >>>>>>>>> Sorry to >>>>>>>>> be a grinch right before the weekend. Ash On Thu, Dec 2 2021 at >>>>>>>>> 22:47:34 >>>>>>>>> +0100, Jarek Potiuk <ja...@potiuk.com> wrote: Oh yeah - good >>>>>>>>> point and we spoke about performance testing/implications. >>>>>>>>> Performance is >>>>>>>>> something we were discussing as the next step when we get general >>>>>>>>> "OK" in >>>>>>>>> the direction - we just want to make sure that there are no "huge" >>>>>>>>> blockers >>>>>>>>> in the way this is proposed and explain any doubts first, so that the >>>>>>>>> investment in performance part makes sense. We do not want to spend a >>>>>>>>> lot >>>>>>>>> of time on getting the tests done and detailed inventory of methods/ >>>>>>>>> API >>>>>>>>> calls to get - only to find out that this is generally "bad >>>>>>>>> direction". >>>>>>>>> Just to clarify again - we also considered (alternative option) to >>>>>>>>> automatically map all the DB methods in the remote calls. But we >>>>>>>>> dropped >>>>>>>>> that idea - precisely for the reason of performance, and transaction >>>>>>>>> integrity. So we are NOT mapping DB calls into API calls. those will >>>>>>>>> be >>>>>>>>> "logical operations" on the database. Generally speaking, most of the >>>>>>>>> API >>>>>>>>> calls for the "airflow system-level but executed in worker" calls >>>>>>>>> will be >>>>>>>>> rather "coarse" than fine-grained. For example, the aforementioned >>>>>>>>> "mini >>>>>>>>> scheduler" - where we want to make a single API call and run the >>>>>>>>> whole of >>>>>>>>> it on the DBAPI side. So there - performance impact is very limited >>>>>>>>> IMHO. >>>>>>>>> And If we see any other "logic" like that in other parts of the code >>>>>>>>> (zombie detection as an example). We plan to make a detailed >>>>>>>>> inventory of >>>>>>>>> those once we get general "Looks good" for the direction. For now we >>>>>>>>> did >>>>>>>>> some "rough" checking and it seems a plausible approach and quite >>>>>>>>> doable. >>>>>>>>> One more note - the "fine-grained" ( "variable" update/retrieval, >>>>>>>>> "connection update retrieval") - via REST API will still be used by >>>>>>>>> the >>>>>>>>> user's code though (Parsing DAGs, operators, workers and callbacks). >>>>>>>>> We >>>>>>>>> also plan to make sure that none of the "Community" operators are >>>>>>>>> using >>>>>>>>> "non-blessed" DB calls (we can do it in our CI). So at the end of the >>>>>>>>> exercise, all operators, hooks, etc. from the community will be >>>>>>>>> guaranteed >>>>>>>>> to only use the DB APIs that are available in the "DB API" module. But >>>>>>>>> there I do not expect pretty much any performance penalty as those >>>>>>>>> are very >>>>>>>>> fast and rare operations (and good thing there is that we can cache >>>>>>>>> results >>>>>>>>> of those in workers/DAG processing). J. On Thu, Dec 2, 2021 at 7:16 PM >>>>>>>>> Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: Ah, my >>>>>>>>> bad, I missed that. I'd still like to see discussion of the >>>>>>>>> performance >>>>>>>>> impacts, though. On Thu, Dec 2, 2021 at 11:14 AM Ash Berlin-Taylor < >>>>>>>>> a...@apache.org> wrote: The scheduler was excluded from the >>>>>>>>> components that would use the dbapi - the mini scheduler is the odd >>>>>>>>> one out >>>>>>>>> here is it (currently) runs on the work but shares much of the code >>>>>>>>> from >>>>>>>>> the scheduling path. -a On 2 December 2021 17:56:40 GMT, Andrew >>>>>>>>> Godwin < >>>>>>>>> andrew.god...@astronomer.io.INVALID> wrote: I would also like to >>>>>>>>> see some discussion in this AIP about how the data is going to be >>>>>>>>> serialised to and from the database instances (obviously Connexion is >>>>>>>>> involved, but I presume more transformation code is needed than that) >>>>>>>>> and >>>>>>>>> the potential slowdown this would cause. In my experience, a somewhat >>>>>>>>> direct ORM mapping like this is going to result in considerably slower >>>>>>>>> times for any complex operation that's touching a few hundred rows. Is >>>>>>>>> there a reason this is being proposed for the scheduler code, too? In >>>>>>>>> my >>>>>>>>> mind, the best approach to multitenancy would be to remove all >>>>>>>>> user-supplied code from the scheduler and leave it with direct DB >>>>>>>>> access, >>>>>>>>> rather than trying to indirect all scheduler access through another >>>>>>>>> API >>>>>>>>> layer. Andrew On Thu, Dec 2, 2021 at 10:29 AM Jarek Potiuk < >>>>>>>>> ja...@potiuk.com> wrote: Yeah - I thik Ash you are completely >>>>>>>>> right we need some more "detailed" clarification. I believe, I know >>>>>>>>> what >>>>>>>>> you are - rightfully - afraid of (re impact on the code), and maybe >>>>>>>>> we have >>>>>>>>> not done a good job on explaining it with some of our assumptions we >>>>>>>>> had >>>>>>>>> when we worked on it with Mateusz. Simply it was not clear that our >>>>>>>>> aim is >>>>>>>>> to absolutely minimise the impact on the "internal DB transactions" >>>>>>>>> done in >>>>>>>>> schedulers and workers. The idea is that change will at most result in >>>>>>>>> moving an execution of the transactions to another process but not >>>>>>>>> changing >>>>>>>>> what the DB transactions do internally. Actually this was one of the >>>>>>>>> reason >>>>>>>>> for the "alternative" approach (you can see it in the document) we >>>>>>>>> discussed about - hijack "sqlalchemy session" - this is far too low >>>>>>>>> level >>>>>>>>> and the aim of the "DB-API" is NOT to replace direct DB calls (Hence >>>>>>>>> we >>>>>>>>> need to figure out a better name). The API is there to provide >>>>>>>>> "scheduler >>>>>>>>> logic" API and "REST access to Airflow primitives like >>>>>>>>> dags/tasks/variables/connections" etc.. As an example (which we >>>>>>>>> briefly >>>>>>>>> talked about in slack) the "_run_mini_scheduler_on_child_tasks" case ( >>>>>>>>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274) >>>>>>>>> is an example (that we would put in the doc). As we thought of it - >>>>>>>>> this is >>>>>>>>> a "single DB-API operation". Those are not Pure REST calls of course, >>>>>>>>> they >>>>>>>>> are more RPC-like calls. That is why even initially I thought of >>>>>>>>> separating >>>>>>>>> the API completely. But since there are a lot of common "primitive" >>>>>>>>> calls >>>>>>>>> that we can re-use, I think having a separate DB-API component which >>>>>>>>> will >>>>>>>>> re-use connexion implementation, replacing authentication with the >>>>>>>>> custom >>>>>>>>> worker <> DB-API authentication is the way to go. And yes if we agree >>>>>>>>> on >>>>>>>>> the general idea, we need to choose the best way on how to best >>>>>>>>> "connect" >>>>>>>>> the REST API we have with the RPC-kind of API we need for some cases >>>>>>>>> in >>>>>>>>> workers. But we wanted to make sure we are on the same page with the >>>>>>>>> direction. And yes it means that DB-API will potentially have to >>>>>>>>> handle >>>>>>>>> quite a number of DB operations (and that it has to be replicable and >>>>>>>>> scalable as well) - but DB-API will be "stateless" similarly as the >>>>>>>>> webserver is, so it will be scalable by definition. And yest >>>>>>>>> performance >>>>>>>>> tests will be part of POC - likely even before we finally ask for >>>>>>>>> votes >>>>>>>>> there. So in short: * no modification or impact on current scheduler >>>>>>>>> behaviour when DB Isolation is disabled * only higher level methods >>>>>>>>> will be >>>>>>>>> moved out to DB-API and we will reuse existing "REST" APIS where it >>>>>>>>> makes >>>>>>>>> sense * we aim to have "0" changes to the logic of processing - both >>>>>>>>> in Dag >>>>>>>>> Processing logic and DB API. We think with this architecture we >>>>>>>>> proposed >>>>>>>>> it's perfectly doable I hope this clarifies a bit, and once we agree >>>>>>>>> on >>>>>>>>> general direction, we will definitely work on adding more details and >>>>>>>>> clarification (we actually already have a lot of that but we just >>>>>>>>> wanted to >>>>>>>>> start with explaining the idea and going into more details later when >>>>>>>>> we >>>>>>>>> are sure there are no "high-level" blockers from the community. J, On >>>>>>>>> Thu, >>>>>>>>> Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote: >>>>>>>>> > > I just provided a general idea for the approach - but if you want >>>>>>>>> > > me to >>>>>>>>> put more examples then I am happy to do that > > > Yes please. > > It >>>>>>>>> is >>>>>>>>> too general for me and I can't work out what effect it would actually >>>>>>>>> have >>>>>>>>> on the code base, especially how it would look with the config option >>>>>>>>> to >>>>>>>>> enable/disable direct db access. > > -ash > > On Thu, Dec 2 2021 at >>>>>>>>> 16:36:57 +0100, Mateusz Henc <mh...@google.com.INVALID> wrote: > >>>>>>>>> > Hi, > I am sorry if it is not clear enough, let me try to explain it >>>>>>>>> here, so maybe it gives more light on the idea. > See my comments >>>>>>>>> below > > >>>>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org> >>>>>>>>> wrote: >> >> I'm sorry to say it, but this proposal right just doesn't >>>>>>>>> contain enough detail to say what the actual changes to the code >>>>>>>>> would be, >>>>>>>>> and what the impact would be >> >> To take the one example you have >>>>>>>>> so far: >>>>>>>>> >> >> >> def get_dag_run(self, dag_id, execution_date): >> return >>>>>>>>> self.db_client.get_dag_run(dag_id,execution_date) >> >> So form this >>>>>>>>> snippet I'm guessing it would be used like this: >> >> dag_run = >>>>>>>>> db_client.get_dag_run(dag_id, execution_date) >> >> What type of >>>>>>>>> object is >>>>>>>>> returned? > > > As it replaces: > dag_run = session.query(DagRun) > >>>>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>>>>>>>> execution_date) > >>>>>>>>> .first() > > then the type of the object will be exactly the same >>>>>>>>> (DagRun) >>>>>>>>> . > >> >> >> Do we need one API method per individual query we have >>>>>>>>> in the >>>>>>>>> source? > > > No, as explained by the sentence: > > The method may be >>>>>>>>> extended, accepting more optional parameters to avoid having too many >>>>>>>>> similar implementations. > > >> >> >> Which components would use this >>>>>>>>> new >>>>>>>>> mode when it's enabled? > > > You may read: > Airflow Database APi is >>>>>>>>> a new >>>>>>>>> independent component of Airflow. It allows isolating some components >>>>>>>>> (Worker, DagProcessor and Triggerer) from direct access to DB. > >> >>>>>>>>> >> But >>>>>>>>> what you haven't said the first thing about is what _other_ changes >>>>>>>>> would >>>>>>>>> be needed in the code. To take a fairly simple example: >> >> dag_run >>>>>>>>> = >>>>>>>>> db_client.get_dag_run(dag_id, execution_date) >> dag_run.queued_at = >>>>>>>>> timezone.now() >> # How do I save this? >> >> In short, you need to >>>>>>>>> put a >>>>>>>>> lot more detail into this before we can even have an idea of the full >>>>>>>>> scope >>>>>>>>> of the change this proposal would involve, and what code changes >>>>>>>>> would be >>>>>>>>> needed for compnents to work with and without this setting enabled. > >>>>>>>>> > > >>>>>>>>> For this particular example - it depends on the intention of the code >>>>>>>>> author > - If this should be in transaction - then I would actually >>>>>>>>> introduce new method like enqueue_dag_run(...) that would run these >>>>>>>>> two >>>>>>>>> steps on Airflow DB API side > - if not then, maybe just the >>>>>>>>> "update_dag_run" method accepting the whole "dag_run" object and >>>>>>>>> saving it >>>>>>>>> to the DB. > > In general - we could take naive approach, eg replace >>>>>>>>> code: >>>>>>>>> > dag_run = session.query(DagRun) > .filter(DagRun.dag_id == dag_id, >>>>>>>>> DagRun.execution_date == execution_date) > .first() > with: > if >>>>>>>>> self.db_isolation: > dag_run = session.query(DagRun) > >>>>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>>>>>>>> execution_date) > >>>>>>>>> .first() > else: > dag_run = db_client.get_dag_run(self, dag_id, >>>>>>>>> execution_date) > > The problem is that Airflow DB API would need to >>>>>>>>> have >>>>>>>>> the same implementation for the query - so duplicated code. That's >>>>>>>>> why we >>>>>>>>> propose moving this code to the DBClient which is also used by the >>>>>>>>> Airflow >>>>>>>>> DB API(in DB direct mode). > > I know there are many places where the >>>>>>>>> code >>>>>>>>> is much more complicated than a single query, but they must be handled >>>>>>>>> one-by-one, during the implementation, otherwise this AIP would be >>>>>>>>> way too >>>>>>>>> big. > > I just provided a general idea for the approach - but if you >>>>>>>>> want >>>>>>>>> me to put more examples then I am happy to do that > > Best regards, > >>>>>>>>> Mateusz Henc > >> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz >>>>>>>>> Henc >>>>>>>>> <mh...@google.com.INVALID> wrote: >> >> Hi, >> I just added a new >>>>>>>>> AIP for running some Airflow components in DB-isolation mode, without >>>>>>>>> direct access to the Airflow Database, but they will use a new API >>>>>>>>> for thi >>>>>>>>> purpose. >> >> PTAL: >> >>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API >>>>>>>>> >> >> Open question: >> I called it "Airflow Database API" - however >>>>>>>>> >> >> I feel >>>>>>>>> it could be more than just an access layer for the database. So if >>>>>>>>> you have >>>>>>>>> a better name, please let me know, I am happy to change it. >> >> Best >>>>>>>>> regards, >> Mateusz Henc >>>>>>>>> >>>>>>>>