Hi Jarek, Sorry for another question. It is regarding the error handling.
In the db session world, when the client fails during session.commit due to any reasons, the client knows the db transaction will be rolled back. With the internal API, that might not always be the case as there is another hop. I am wondering how this will be handled. Thanks, Ping On Thu, Aug 4, 2022 at 10:18 AM Ping Zhang <pin...@umich.edu> wrote: > Hi Jarek, > > Thanks for the thorough explanation. They all make sense, especially the > performance impact and scalability part. I believe with the internal API, > it should reduce lots of db connections. > > Looking forward to it! > > Thanks, > > Ping > > > On Thu, Aug 4, 2022 at 12:11 AM Jarek Potiuk <ja...@potiuk.com> wrote: > >> I would like to see more about how you do this because I have noticed >> that there are tons of different configurations in the airflow and some >> combinations are less covered by the test as there are some default values, >> for example this code >> <https://github.com/apache/airflow/blob/3c08cefdfd2e2636a714bb835902f0cb34225563/airflow/task/task_runner/standard_task_runner.py#L39-L42> >> . >> >> > Good point. I plan to have another "test type" added in our CI and run >> the full test suite there. I am not planning to add new tests (well there >> will be some generic DB-less mode tests for sure but I am mostly going to >> rely on the existing test harness for Airflow). The change will be that I >> will just block all direct DB access from any methods that will be executed >> via any of the "airflow" main code. This should be rather easy - since we >> are using SQLAlchemy, we will be able to capture all DB requests and see if >> they are coming from "Airflow" or from "Tests" (looking at call stack >> trace). This should allow us to have a "DB isolation" mode where (vast) >> majority of the current tests will be able to run without any modifications >> but we will fail if any of the "airflow" code accesses the DB directly. >> Likely some tests will need to be adapted or not run in the DB Isolation >> mode, but this should be easy as well. >> >> This way we will be able to not only to fix any "unwanted" DB access >> currently (for example if any of the provider code accesses DB directly we >> will see tests failing). This will also be future-compatible, i.e. new >> tests or changes in airflow will also have to pass the tests so there is >> very little overhead needed in case of future airflow code. We are not >> making anything but possibly minor modifications to the internal Airflow >> structure or methods/class structure and except adding the need to also >> keep .proto definitions for the Internal APIs (maybe 30-40 methods in >> total) there is no extra overhead involved to develop new features or >> modify the existing ones. >> >> > Also, it will be great if your AIP can discuss the long term strategy >> about the internal API. Personally, I think we should completely remove the >> db session access from the worker and other untrusted components and fully >> rely on the Internal API. This will reduce the maintenance burden as if we >> add new db access in those untrusted components, we need to do it for both >> db session and internal API. What's more, with the Internal API, we can >> even remove the db-proxy :) >> >> Maybe that was not clear enough but this is precisely what AIP-44 is >> about! We are going to remove ALL Direct DB acess from those untrusted >> components. ALL of it. Full stop. The (not even long term) goal is that >> when AIP-44 is implemented, untrusted components will not need Metadata DB >> access at all. We are going to fully rely on the Internal API :). I think >> the way I proposed it, with the help of a few interested people, we should >> be able to get there in Airflow 2.5 (looking at the inventory of methods). >> I will need to update the inventory after 2.4 is out with the dataset >> changes but I do not expect particular increase in the scope. The steps >> that need to be done to complete it: >> >> * Implement the foundation -> I plan to start working on immediately >> after approval): complete the POC and implement the test harness for our CI >> (should be rather easy ) >> * Redo the inventory and prepare list of methods most of the changes is >> to implement ".proto" for all the method to replace -> right after we cut >> 2.4 branch >> * Replace the methods and make sure that all the tests pass -> once we >> have foundation in place this is easy to be done by a number of people, >> similarly as we do AIP-47 where we have a number of people implementing >> parts of it. >> * overall testing including performance impact and scalability (which I >> had initially agreed that Google Composer Team could help a lot for "scale" >> testing). >> >> If I have support from the community members to help with the last point, >> I think we should be rather safe to get it implemented for 2.5 (or 2.6 if >> the final testing will reveal some optimisations needs). >> >> This will all be under feature-flag and the idea is that without the flag >> enabled there should be nearly 0 impact on the existing functionality >> (there will be very slight overhead to make an extra method call when any >> DB method starts) - but since those are "DB" methods, it will be totally >> neglectible.. >> >> J. >> >> >> On Thu, Aug 4, 2022 at 12:47 AM Ping Zhang <pin...@umich.edu> wrote: >> >>> >>> Hi Jarek, >>> >>> Thanks for pushing forward with this AIP. >>> >>> I am very interesting in this part: >>> >>> All tests for DagProcessor, Workers (including Operators) and triggers >>> are executed in both modes "DBDirect'' and "DBIsolation ''. The code for >>> Airflow Database API, the DbApiClient and DBSession are extended until all >>> tests pass. >>> >>> I would like to see more about how you do this because I have noticed >>> that there are tons of different configurations in the airflow and some >>> combinations are less covered by the test as there are some default values, >>> for example this code >>> <https://github.com/apache/airflow/blob/3c08cefdfd2e2636a714bb835902f0cb34225563/airflow/task/task_runner/standard_task_runner.py#L39-L42> >>> . >>> >>> Also, it will be great if your AIP can discuss the long term strategy >>> about the internal API. Personally, I think we should completely remove the >>> db session access from the worker and other untrusted components and fully >>> rely on the Internal API. This will reduce the maintenance burden as if we >>> add new db access in those untrusted components, we need to do it for both >>> db session and internal API. What's more, with the Internal API, we can >>> even remove the db-proxy :) >>> >>> Thanks, >>> >>> Ping >>> >>> >>> On Tue, Aug 2, 2022 at 6:56 AM Jarek Potiuk <ja...@potiuk.com> wrote: >>> >>>> Hello, >>>> >>>> I have updated AIP-44 with the findings from the POC. If there are any >>>> more questions/comments, please let me know, otherwise I plan to start >>>> voting tomorrow. >>>> >>>> >>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>> >>>> On Mon, Jul 25, 2022 at 10:18 PM Jarek Potiuk <ja...@potiuk.com> wrote: >>>> >>>>> I think the drop will be less than those numbers - especially the >>>>> high-end ones. The DB operations we use are quite "coarse" grained (they >>>>> span a single transaction, not a function call). Also I deliberately chose >>>>> "example dags" as the benchmark playground - the example DAGs we have are >>>>> small. And in this case it means that the "actual" work done is rather >>>>> small. If you start parsing bigger DAGs, the overall number of calls (thus >>>>> RPC overhead) will not grow, but the time needed to parse the DAG will >>>>> grow >>>>> a lot. This means that the RPC overhead will get way smaller >>>>> (percentage-wise). >>>>> >>>>> Also this is not an "overall" performance drop - this is really a >>>>> "mini-benchmark" (as opposed to "system benchmark" and "micro-benchmark"). >>>>> The benchmark only concerns running particular "transactions" - but all >>>>> the other parts remain unaffected - so the numbers I saw are the absolute, >>>>> maximum limits of the performance drop and they will be much smaller in >>>>> reality. >>>>> >>>>> J. >>>>> >>>>> >>>>> On Mon, Jul 25, 2022 at 9:01 PM Mateusz Henc <mh...@google.com.invalid> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> Sorry, but I was OOO and offline last week and I was not able to even >>>>>> rely. >>>>>> >>>>>> Thank you Jarek for a very detailed analysis. I think we all expected >>>>>> some performance drop for better security. >>>>>> Do you think the performance drop that you measured is what we should >>>>>> expect in the "final" version? I don't know Airflow that much to know >>>>>> which >>>>>> DB- access is used most (and what object types are required there - >>>>>> small/big or maybe just primitives?). >>>>>> >>>>>> Nonetheless, I am looking forward to starting the voting process! >>>>>> >>>>>> Best regards, >>>>>> Mateusz Henc >>>>>> >>>>>> >>>>>> On Fri, Jul 15, 2022 at 4:48 PM Jarek Potiuk <ja...@potiuk.com> >>>>>> wrote: >>>>>> >>>>>>> Two more things: >>>>>>> 1) there is one "caveat" I had to handle - timeout handling in >>>>>>> DagFileProcessor (signals do not play well with threads of GRPC - but >>>>>>> that >>>>>>> should be easy to fix) >>>>>>> 2) The way I proposed it (with very localized changes needed) will >>>>>>> be very easy to split the job among multiple people and make it a true >>>>>>> community effort to complete - no need for major refactorings or changes >>>>>>> across the whole airflow code. >>>>>>> >>>>>>> J. >>>>>>> >>>>>>> On Fri, Jul 15, 2022 at 4:32 PM Jarek Potiuk <ja...@potiuk.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello Everyone, >>>>>>>> >>>>>>>> First of all - apologies for those who waited for it, I've been >>>>>>>> dragged in multiple directions, but finally I got some quality time to >>>>>>>> take >>>>>>>> a look at open questions and implement POC for AIP-44 as promised >>>>>>>> before. >>>>>>>> >>>>>>>> TL;DR; I finally came back to the AIP-44 and multi-tenancy and have >>>>>>>> made good progress that hopefully will lead to voting next week. I >>>>>>>> think I >>>>>>>> have enough of the missing evidence of the impact of the internal API >>>>>>>> on >>>>>>>> performance, also I have implemented working POC with one of the >>>>>>>> Internal >>>>>>>> API calls (processFile) that we will have to implement and run a >>>>>>>> series of >>>>>>>> performance tests with it. >>>>>>>> >>>>>>>> # Current state >>>>>>>> -------------------- >>>>>>>> >>>>>>>> The state we left it few months ago was ( >>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>>>>>> ): >>>>>>>> * I've prepared inventory of methods and general approach we are >>>>>>>> going to take (and we got consensus there) >>>>>>>> * I've left decision on the final choice (Thrift vs. GRPC) to later >>>>>>>> POC (I got some feedback from Beam about gRPC vs. Thrift, based on >>>>>>>> that I >>>>>>>> started with GRPC and I am actually very happy with it, so i think we >>>>>>>> can >>>>>>>> leave Thrift out of the picture). >>>>>>>> * We've struggled a bit with the decision - should we leave two >>>>>>>> paths (with GRPC/Direct DB) or one (local GRPC vs. remote GRPC) >>>>>>>> >>>>>>>> The POC is implemented here: >>>>>>>> https://github.com/apache/airflow/pull/25094 and I have the >>>>>>>> following findings: >>>>>>>> >>>>>>>> # Performance impact >>>>>>>> ------------------------------ >>>>>>>> >>>>>>>> The performance impact is visible (as expected). It's quite >>>>>>>> acceptable for a distributed environment (in exchange for security), >>>>>>>> but >>>>>>>> it's likely significant enough to stay with the original idea of having >>>>>>>> Airflow work in two modes: a) secure - with RPC apis, b) standard - >>>>>>>> with >>>>>>>> direct DB calls. >>>>>>>> >>>>>>>> On "localhost" with a local DB the performance overhead for >>>>>>>> serializing and transporting the messages between two different >>>>>>>> processes >>>>>>>> introduced up to 10% overhead. I saw an increase of time to run 500 >>>>>>>> scans >>>>>>>> of all our example folders dags going from ~290 to ~320s pretty >>>>>>>> consistently. Enough of a difference to exclude volatility. I tested >>>>>>>> it in >>>>>>>> Docker on both ARM and AMD. It seems that in some cases that can be >>>>>>>> partially offset by slightly increased parallelism on multi-processing >>>>>>>> machines run on "bare" metal. I got consistently just 2-3% increase on >>>>>>>> my >>>>>>>> linux without docker with the same test harness, but we cannot rely on >>>>>>>> such >>>>>>>> configurations, I think focusing on Docker-based installation without >>>>>>>> any >>>>>>>> special assumptions on how your networking/localhost is implemented is >>>>>>>> something we should treat as a baseline. >>>>>>>> >>>>>>>> The same experiment repeated with 50 messages but of much bigger >>>>>>>> size (300 Callbacks passed per message) have shown 30% performance >>>>>>>> drop. >>>>>>>> This is significant, but I believe most of our messages will be much >>>>>>>> smaller. Also the messages I chose were very special - because in case >>>>>>>> of >>>>>>>> Callback we are doing rather sophisticated serialization, because we >>>>>>>> have >>>>>>>> to account for Kubernetes objects that are potentially not >>>>>>>> serializable, so >>>>>>>> the test involved 300 messages that had to be not only GRPC serialized >>>>>>>> but >>>>>>>> also some parts of them had to be Airflow-JSON serialized and the whole >>>>>>>> "executor config" had to be picked/unpickled (for 100 such messages >>>>>>>> out of >>>>>>>> 300). Also we have some conditional processing there (there is a union >>>>>>>> of >>>>>>>> three types of callbacks that has to be properly deserialized). And >>>>>>>> those >>>>>>>> messages could be repeated. This is not happening for most other >>>>>>>> cases. And >>>>>>>> we can likely optimize it away in the future. but I wanted to see the >>>>>>>> "worst" scenario. >>>>>>>> >>>>>>>> For the remote client (still local DB for the internal API server) >>>>>>>> I got between 30% and 80% slow-down, but this was on my WiFi network. >>>>>>>> I am >>>>>>>> sure with a "wired" network, it will be much better, also when a >>>>>>>> remote DB >>>>>>>> gets into picture the overall percentage overhead will be much >>>>>>>> smaller. We >>>>>>>> knew this would be a "slower" solution but this is the price for >>>>>>>> someone >>>>>>>> who wants to have security and isolation. >>>>>>>> >>>>>>>> I have not yet performed the tests with SSL, but I think this will >>>>>>>> be a modest increase. And anyhow the "impact" of 10% is IMHO enough to >>>>>>>> make >>>>>>>> a decision that we cannot get "GRPC-only" - the path where we will >>>>>>>> continue >>>>>>>> using Direct DB access should stay (and I know already how to do it >>>>>>>> relatively easily). >>>>>>>> >>>>>>>> # Implementation and maintainability >>>>>>>> ------------------------------------------------- >>>>>>>> >>>>>>>> In the PR you will see the way I see implementation details and >>>>>>>> how it will impact our code in general. I think what I proposed is >>>>>>>> actually rather elegant and easy to maintain, and likely we can >>>>>>>> improve it >>>>>>>> somehow (happy to brainstorm on some creative ways we could use - for >>>>>>>> example - decorators) to make it "friendler" but I focused more on >>>>>>>> explicitness and showing the mechanisms involved rather than >>>>>>>> "fanciness". I >>>>>>>> found it rather easy to implement and it does seem to have some good >>>>>>>> "easy >>>>>>>> maintainability" properties. The way I propose it boils down to few >>>>>>>> "rules": >>>>>>>> >>>>>>>> * For all the data and "Messages" we sent, we have a nice "Internal >>>>>>>> API" defined in .proto and protobuf objects generated out of that. The >>>>>>>> GRPC >>>>>>>> proto nicely defines the structures we are going to send over the >>>>>>>> network. >>>>>>>> It's very standard, it has a really good modern support. For one, we >>>>>>>> automatically - I added pre-commit - generate MyPY type stubs for the >>>>>>>> generated classes and it makes it super easy to both - implement the >>>>>>>> mapping and automatically verify its correctness. Mypy nicely catches >>>>>>>> all >>>>>>>> kinds of mistakes you can make)! Autocomplete for the PROTO-generated >>>>>>>> classes works like a charm. I struggled initially without typing, but >>>>>>>> once >>>>>>>> I configured mypy stub generation, I got really nice detection of >>>>>>>> mistakes >>>>>>>> I've made and I was able to progress with the implementation way faster >>>>>>>> than without it. Usually everything magically worked as soon as I >>>>>>>> fixed all >>>>>>>> MyPy errors. >>>>>>>> >>>>>>>> * All Airflow objects that we get to send over GRPC should get >>>>>>>> from_protobuf/to_protobuf methods. They are usually very simple (just >>>>>>>> passing strings/ints/boolean fields to the constructor), and the >>>>>>>> structures >>>>>>>> we pass are rather small (see the PR). Also (as mentioned above) I >>>>>>>> implemented a few more complex cases (with more complex serialization) >>>>>>>> and >>>>>>>> it is easy, readable and pretty well integrated into our code IMHO. >>>>>>>> This >>>>>>>> introduces a little duplication here and there, but those objects >>>>>>>> change >>>>>>>> rarely (only when we implement big features like Dynamic Task mapping) >>>>>>>> and >>>>>>>> MyPy guards us against any mishaps there (now that our code is mostly >>>>>>>> typed). >>>>>>>> >>>>>>>> * We make all "DB Aware" objects also "GRPC aware". The number of >>>>>>>> changes in the actual code/logic is rather small and makes it super >>>>>>>> easy to >>>>>>>> maintain IMHO. There are two changes: >>>>>>>> >>>>>>>> 1) for any of the objects that are used for database operations >>>>>>>> (in my PR this is DagFileProcessor) we need to initialize it with the >>>>>>>> "use_grpc" flag and pass it a channel that will be used for >>>>>>>> communication. >>>>>>>> 2) the DB methods we have (inventory in the AIP) will have to >>>>>>>> be refactored slightly. This is what really was added, the original >>>>>>>> "process_file" method was renamed to "process_file_db" and the >>>>>>>> "callers" of >>>>>>>> the method are completely intact. >>>>>>>> >>>>>>>> def process_file_grpc( >>>>>>>> self, >>>>>>>> file_path: str, >>>>>>>> callback_requests: List[CallbackRequest], >>>>>>>> pickle_dags: bool = False, >>>>>>>> ) -> Tuple[int, int]: >>>>>>>> request = >>>>>>>> internal_api_pb2.FileProcessorRequest(path=file_path, >>>>>>>> pickle_dags=pickle_dags) >>>>>>>> for callback_request in callback_requests: >>>>>>>> request.callbacks.append(callback_request.to_protobuf()) >>>>>>>> res = self.stub.processFile(request) >>>>>>>> return res.dagsFound, res.errorsFound >>>>>>>> >>>>>>>> def process_file( >>>>>>>> self, >>>>>>>> file_path: str, >>>>>>>> callback_requests: List[CallbackRequest], >>>>>>>> pickle_dags: bool = False, >>>>>>>> ) -> Tuple[int, int]: >>>>>>>> if self.use_grpc: >>>>>>>> return self.process_file_grpc( >>>>>>>> file_path=file_path, >>>>>>>> callback_requests=callback_requests, pickle_dags=pickle_dags >>>>>>>> ) >>>>>>>> return self.process_file_db( >>>>>>>> file_path=file_path, >>>>>>>> callback_requests=callback_requests, pickle_dags=pickle_dags >>>>>>>> ) >>>>>>>> >>>>>>>> You can see all the details in the PR >>>>>>>> https://github.com/apache/airflow/pull/25094. >>>>>>>> >>>>>>>> # POC testing harness (self-service :) ) >>>>>>>> ---------------------------------------------------- >>>>>>>> >>>>>>>> You can also very easily test it yourself: >>>>>>>> >>>>>>>> * airflow internal-api server -> runs internal API server. >>>>>>>> * airflow internal-api test-client --num-repeats 10 --num-callbacks >>>>>>>> 10 --use-grpc -> runs file-processing of all example dags 10 times with >>>>>>>> 10x3 callbacks sent. Same command without --use-grpc will run using >>>>>>>> direct >>>>>>>> DB access. Server listens on "50051" port, client connects to >>>>>>>> "localhost:50051" - you can modify the code to use remote IP/different >>>>>>>> ports (easy to find by localhost:50051). >>>>>>>> >>>>>>>> # Discussion and voting >>>>>>>> -------------------------------- >>>>>>>> >>>>>>>> Of course some decisions in the PR can be improved (I focused on >>>>>>>> explicitness of the POC more than anything else). We can discuss some >>>>>>>> changes and improvements to some decisions I made when the PR will be >>>>>>>> in >>>>>>>> "reviewable state" and I am happy to improve it, but for now I would >>>>>>>> like >>>>>>>> to focus on answering the two questions: >>>>>>>> >>>>>>>> * Does it look plausible? >>>>>>>> * Does it look like it's almost ready to vote on it? (I will update >>>>>>>> the AIP before starting voting, of course). >>>>>>>> >>>>>>>> Let me know what you think. >>>>>>>> >>>>>>>> J. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 1, 2022 at 3:11 PM Jarek Potiuk <ja...@potiuk.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Since we have AIP-43 already approved, I think I would love to >>>>>>>>> have more questions and discussions about AIP-44 - The "Airflow >>>>>>>>> Internal >>>>>>>>> API" >>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>>>>>>> (as the new name is). >>>>>>>>> >>>>>>>>> For those who would like to get more context - recording of the >>>>>>>>> meeting where both AIP-43 and AIP-44 scope and proposals were >>>>>>>>> discussed can >>>>>>>>> be found here: >>>>>>>>> https://drive.google.com/file/d/1SMFzazuY1kg4B4r11wNt8EQ_PmTDRKq6/view >>>>>>>>> >>>>>>>>> In the AIP I just made a small clarification regarding some >>>>>>>>> "future" changes - specifically the token security might be nicely >>>>>>>>> handled >>>>>>>>> together with AIP-46 "Add support for docker runtime isolation" that >>>>>>>>> is >>>>>>>>> proposed by Ping. >>>>>>>>> >>>>>>>>> My goal is to gather comments till the end of the week, and if >>>>>>>>> there will be no big concerns, I would love to start voting next week. >>>>>>>>> >>>>>>>>> J. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Jan 3, 2022 at 2:48 PM Jarek Potiuk <ja...@potiuk.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Also AIP-44 - which is the DB isolation mode is much more >>>>>>>>>> detailed and >>>>>>>>>> ready for deeper discussion if needed. >>>>>>>>>> >>>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API >>>>>>>>>> >>>>>>>>>> On Tue, Dec 14, 2021 at 12:07 PM Jarek Potiuk <ja...@potiuk.com> >>>>>>>>>> wrote: >>>>>>>>>> > >>>>>>>>>> > And just to add to that. >>>>>>>>>> > >>>>>>>>>> > Thanks again for the initial comments and pushing us to provide >>>>>>>>>> more >>>>>>>>>> > details. That allowed us to discuss and focus on many of the >>>>>>>>>> aspects >>>>>>>>>> > that were raised and we have many more answers now: >>>>>>>>>> > >>>>>>>>>> > * first of all - we focused on making sure impact on the >>>>>>>>>> existing code >>>>>>>>>> > and "behavior" of Airflow is minimal. In fact there should be >>>>>>>>>> > virtually no change vs. current behavior when db isolation is >>>>>>>>>> > disabled. >>>>>>>>>> > * secondly - we've done a full inventory of how the API needed >>>>>>>>>> should >>>>>>>>>> > look like and (not unsurprisingly) it turned out that the >>>>>>>>>> current >>>>>>>>>> > REST-style API is good for part of it but most of the 'logic" of >>>>>>>>>> > airflow can be done efficiently when we go to RPC-style API. >>>>>>>>>> However >>>>>>>>>> > we propose that the authorization/exposure of the API is the >>>>>>>>>> same as >>>>>>>>>> > we use currently in the REST API, this will allow us to reuse a >>>>>>>>>> big >>>>>>>>>> > part of the infrastructure we already have >>>>>>>>>> > * thirdly - we took deep into our hearts the comments about >>>>>>>>>> having to >>>>>>>>>> > maintain pretty much the same logic in a few different places. >>>>>>>>>> That's >>>>>>>>>> > an obvious maintenance problem. The proposal we came up with >>>>>>>>>> addresses >>>>>>>>>> > it - we are going to keep the logic of Airflow internals in one >>>>>>>>>> place >>>>>>>>>> > only and we will simply smartly route on where the logic will be >>>>>>>>>> > executed >>>>>>>>>> > * regarding the performance impact - we described the deployment >>>>>>>>>> > options that our proposal makes available - we do not want to >>>>>>>>>> favor >>>>>>>>>> > one deployment option over another, but we made sure the >>>>>>>>>> architecture >>>>>>>>>> > is done in the way, that you can choose which deployment is >>>>>>>>>> good for >>>>>>>>>> > you: "no isolation", "partial isolation", "full isolation" - >>>>>>>>>> each with >>>>>>>>>> > different performance/resource characteristics - all of them >>>>>>>>>> fully >>>>>>>>>> > horizontally scalable and nicely manageable. >>>>>>>>>> > >>>>>>>>>> > We look forward to comments, also for the API 43 - >>>>>>>>>> > >>>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation >>>>>>>>>> > - AIP-43 is a prerequiste to AIP-44 and they both work together. >>>>>>>>>> > >>>>>>>>>> > We also will think and discuss more follow-up AIPs once we get >>>>>>>>>> those >>>>>>>>>> > approved (hopefully ;) ). The multi-tenancy is a 'long haul" >>>>>>>>>> and while >>>>>>>>>> > those two AIPs are foundational building blocks, there are at >>>>>>>>>> least >>>>>>>>>> > few more follow-up AIPs to be able to say "we're done with >>>>>>>>>> > Multi-tenancy" :). >>>>>>>>>> > >>>>>>>>>> > J. >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > On Tue, Dec 14, 2021 at 9:58 AM Mateusz Henc >>>>>>>>>> <mh...@google.com.invalid> wrote: >>>>>>>>>> > > >>>>>>>>>> > > Hi, >>>>>>>>>> > > >>>>>>>>>> > > As promised we (credits to Jarek) updated the AIPs, added >>>>>>>>>> more details, did inventory and changed the way API endpoints are >>>>>>>>>> generated. >>>>>>>>>> > > >>>>>>>>>> > > We also renamed it to Airflow Internal API - so the url has >>>>>>>>>> changed: >>>>>>>>>> > > >>>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API >>>>>>>>>> > > >>>>>>>>>> > > Please take a look, any comments are highly appreciated. >>>>>>>>>> > > >>>>>>>>>> > > Best regards, >>>>>>>>>> > > Mateusz Henc >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > On Mon, Dec 6, 2021 at 3:09 PM Mateusz Henc <mh...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> > >> >>>>>>>>>> > >> Hi, >>>>>>>>>> > >> Thank you Ash for your feedback (in both AIPs) >>>>>>>>>> > >> >>>>>>>>>> > >> We are working to address your concerns. We will update the >>>>>>>>>> AIPs in a few days. >>>>>>>>>> > >> I will let you know when it's done. >>>>>>>>>> > >> >>>>>>>>>> > >> Best regards, >>>>>>>>>> > >> Mateusz Henc >>>>>>>>>> > >> >>>>>>>>>> > >> >>>>>>>>>> > >> On Fri, Dec 3, 2021 at 7:27 PM Jarek Potiuk < >>>>>>>>>> ja...@potiuk.com> wrote: >>>>>>>>>> > >>> >>>>>>>>>> > >>> Cool. Thanks for the guidance. >>>>>>>>>> > >>> >>>>>>>>>> > >>> On Fri, Dec 3, 2021 at 6:37 PM Ash Berlin-Taylor < >>>>>>>>>> a...@apache.org> wrote: >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > - make an inventory: It doesn't need to be exhaustive, >>>>>>>>>> but a representative sample. >>>>>>>>>> > >>> > - More clearly define _what_ the API calls return -- >>>>>>>>>> object type, methods on them etc. >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > From the AIP you have this example: >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > def get_dag_run(self, dag_id, execution_date): >>>>>>>>>> > >>> > return >>>>>>>>>> self.db_client.get_dag_run(dag_id,execution_date) >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > What does that _actually_ return? What capabilities does >>>>>>>>>> it have? >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > (I have other thoughts but those are less fundamental and >>>>>>>>>> can be discussed later) >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > -ash >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > On Fri, Dec 3 2021 at 18:20:21 +0100, Jarek Potiuk < >>>>>>>>>> ja...@potiuk.com> wrote: >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > Surely - if you think that we need to do some more work >>>>>>>>>> to get confidence, that's fine. I am sure we can improve it to the >>>>>>>>>> level >>>>>>>>>> that we will not have to do full performance tests, and you are >>>>>>>>>> confident >>>>>>>>>> in the direction. Just to clarify your concerns and make sure we are >>>>>>>>>> on the >>>>>>>>>> same page - as I understand it should: * make an inventory of "actual >>>>>>>>>> changes" the proposal will involve in the database-low-level code of >>>>>>>>>> Airflow * based on that either assessment that those changes are >>>>>>>>>> unlikely >>>>>>>>>> (or likely make a performance impact) * if we asses that it is >>>>>>>>>> likely to >>>>>>>>>> have an impact, some at least rudimentary performance tests to prove >>>>>>>>>> that >>>>>>>>>> this is manageable I think that might be a good exercise to do. Does >>>>>>>>>> it >>>>>>>>>> sound about right? Or do you have any concerns about certain >>>>>>>>>> architectural >>>>>>>>>> decisions taken? No problem with Friday, but if we get answers >>>>>>>>>> today. I >>>>>>>>>> think it will give us time to think about it over the weekend and >>>>>>>>>> address >>>>>>>>>> it next week. J. On Fri, Dec 3, 2021 at 5:56 PM Ash Berlin-Taylor < >>>>>>>>>> a...@apache.org> wrote: >>>>>>>>>> > >>> > >>>>>>>>>> > >>> > This is a fundamental change to the architecture with >>>>>>>>>> significant possible impacts on performance, and likely requires >>>>>>>>>> touching a >>>>>>>>>> large portion of the code base. Sorry, you're going to have to do >>>>>>>>>> expand on >>>>>>>>>> the details first and work out what would actually be involved and >>>>>>>>>> what the >>>>>>>>>> impacts will be: Right now I have serious reservations to this >>>>>>>>>> approach, so >>>>>>>>>> I can't agree on the high level proposal without an actual proposal >>>>>>>>>> (The >>>>>>>>>> current document is, at best, an outline, not an actual proposal.) >>>>>>>>>> Sorry to >>>>>>>>>> be a grinch right before the weekend. Ash On Thu, Dec 2 2021 at >>>>>>>>>> 22:47:34 >>>>>>>>>> +0100, Jarek Potiuk <ja...@potiuk.com> wrote: Oh yeah - good >>>>>>>>>> point and we spoke about performance testing/implications. >>>>>>>>>> Performance is >>>>>>>>>> something we were discussing as the next step when we get general >>>>>>>>>> "OK" in >>>>>>>>>> the direction - we just want to make sure that there are no "huge" >>>>>>>>>> blockers >>>>>>>>>> in the way this is proposed and explain any doubts first, so that the >>>>>>>>>> investment in performance part makes sense. We do not want to spend >>>>>>>>>> a lot >>>>>>>>>> of time on getting the tests done and detailed inventory of methods/ >>>>>>>>>> API >>>>>>>>>> calls to get - only to find out that this is generally "bad >>>>>>>>>> direction". >>>>>>>>>> Just to clarify again - we also considered (alternative option) to >>>>>>>>>> automatically map all the DB methods in the remote calls. But we >>>>>>>>>> dropped >>>>>>>>>> that idea - precisely for the reason of performance, and transaction >>>>>>>>>> integrity. So we are NOT mapping DB calls into API calls. those will >>>>>>>>>> be >>>>>>>>>> "logical operations" on the database. Generally speaking, most of >>>>>>>>>> the API >>>>>>>>>> calls for the "airflow system-level but executed in worker" calls >>>>>>>>>> will be >>>>>>>>>> rather "coarse" than fine-grained. For example, the aforementioned >>>>>>>>>> "mini >>>>>>>>>> scheduler" - where we want to make a single API call and run the >>>>>>>>>> whole of >>>>>>>>>> it on the DBAPI side. So there - performance impact is very limited >>>>>>>>>> IMHO. >>>>>>>>>> And If we see any other "logic" like that in other parts of the code >>>>>>>>>> (zombie detection as an example). We plan to make a detailed >>>>>>>>>> inventory of >>>>>>>>>> those once we get general "Looks good" for the direction. For now we >>>>>>>>>> did >>>>>>>>>> some "rough" checking and it seems a plausible approach and quite >>>>>>>>>> doable. >>>>>>>>>> One more note - the "fine-grained" ( "variable" update/retrieval, >>>>>>>>>> "connection update retrieval") - via REST API will still be used by >>>>>>>>>> the >>>>>>>>>> user's code though (Parsing DAGs, operators, workers and callbacks). >>>>>>>>>> We >>>>>>>>>> also plan to make sure that none of the "Community" operators are >>>>>>>>>> using >>>>>>>>>> "non-blessed" DB calls (we can do it in our CI). So at the end of the >>>>>>>>>> exercise, all operators, hooks, etc. from the community will be >>>>>>>>>> guaranteed >>>>>>>>>> to only use the DB APIs that are available in the "DB API" module. >>>>>>>>>> But >>>>>>>>>> there I do not expect pretty much any performance penalty as those >>>>>>>>>> are very >>>>>>>>>> fast and rare operations (and good thing there is that we can cache >>>>>>>>>> results >>>>>>>>>> of those in workers/DAG processing). J. On Thu, Dec 2, 2021 at 7:16 >>>>>>>>>> PM >>>>>>>>>> Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: Ah, >>>>>>>>>> my bad, I missed that. I'd still like to see discussion of the >>>>>>>>>> performance >>>>>>>>>> impacts, though. On Thu, Dec 2, 2021 at 11:14 AM Ash Berlin-Taylor < >>>>>>>>>> a...@apache.org> wrote: The scheduler was excluded from the >>>>>>>>>> components that would use the dbapi - the mini scheduler is the odd >>>>>>>>>> one out >>>>>>>>>> here is it (currently) runs on the work but shares much of the code >>>>>>>>>> from >>>>>>>>>> the scheduling path. -a On 2 December 2021 17:56:40 GMT, Andrew >>>>>>>>>> Godwin < >>>>>>>>>> andrew.god...@astronomer.io.INVALID> wrote: I would also like to >>>>>>>>>> see some discussion in this AIP about how the data is going to be >>>>>>>>>> serialised to and from the database instances (obviously Connexion is >>>>>>>>>> involved, but I presume more transformation code is needed than >>>>>>>>>> that) and >>>>>>>>>> the potential slowdown this would cause. In my experience, a somewhat >>>>>>>>>> direct ORM mapping like this is going to result in considerably >>>>>>>>>> slower >>>>>>>>>> times for any complex operation that's touching a few hundred rows. >>>>>>>>>> Is >>>>>>>>>> there a reason this is being proposed for the scheduler code, too? >>>>>>>>>> In my >>>>>>>>>> mind, the best approach to multitenancy would be to remove all >>>>>>>>>> user-supplied code from the scheduler and leave it with direct DB >>>>>>>>>> access, >>>>>>>>>> rather than trying to indirect all scheduler access through another >>>>>>>>>> API >>>>>>>>>> layer. Andrew On Thu, Dec 2, 2021 at 10:29 AM Jarek Potiuk < >>>>>>>>>> ja...@potiuk.com> wrote: Yeah - I thik Ash you are completely >>>>>>>>>> right we need some more "detailed" clarification. I believe, I know >>>>>>>>>> what >>>>>>>>>> you are - rightfully - afraid of (re impact on the code), and maybe >>>>>>>>>> we have >>>>>>>>>> not done a good job on explaining it with some of our assumptions we >>>>>>>>>> had >>>>>>>>>> when we worked on it with Mateusz. Simply it was not clear that our >>>>>>>>>> aim is >>>>>>>>>> to absolutely minimise the impact on the "internal DB transactions" >>>>>>>>>> done in >>>>>>>>>> schedulers and workers. The idea is that change will at most result >>>>>>>>>> in >>>>>>>>>> moving an execution of the transactions to another process but not >>>>>>>>>> changing >>>>>>>>>> what the DB transactions do internally. Actually this was one of the >>>>>>>>>> reason >>>>>>>>>> for the "alternative" approach (you can see it in the document) we >>>>>>>>>> discussed about - hijack "sqlalchemy session" - this is far too low >>>>>>>>>> level >>>>>>>>>> and the aim of the "DB-API" is NOT to replace direct DB calls (Hence >>>>>>>>>> we >>>>>>>>>> need to figure out a better name). The API is there to provide >>>>>>>>>> "scheduler >>>>>>>>>> logic" API and "REST access to Airflow primitives like >>>>>>>>>> dags/tasks/variables/connections" etc.. As an example (which we >>>>>>>>>> briefly >>>>>>>>>> talked about in slack) the "_run_mini_scheduler_on_child_tasks" case >>>>>>>>>> ( >>>>>>>>>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274) >>>>>>>>>> is an example (that we would put in the doc). As we thought of it - >>>>>>>>>> this is >>>>>>>>>> a "single DB-API operation". Those are not Pure REST calls of >>>>>>>>>> course, they >>>>>>>>>> are more RPC-like calls. That is why even initially I thought of >>>>>>>>>> separating >>>>>>>>>> the API completely. But since there are a lot of common "primitive" >>>>>>>>>> calls >>>>>>>>>> that we can re-use, I think having a separate DB-API component which >>>>>>>>>> will >>>>>>>>>> re-use connexion implementation, replacing authentication with the >>>>>>>>>> custom >>>>>>>>>> worker <> DB-API authentication is the way to go. And yes if we >>>>>>>>>> agree on >>>>>>>>>> the general idea, we need to choose the best way on how to best >>>>>>>>>> "connect" >>>>>>>>>> the REST API we have with the RPC-kind of API we need for some cases >>>>>>>>>> in >>>>>>>>>> workers. But we wanted to make sure we are on the same page with the >>>>>>>>>> direction. And yes it means that DB-API will potentially have to >>>>>>>>>> handle >>>>>>>>>> quite a number of DB operations (and that it has to be replicable and >>>>>>>>>> scalable as well) - but DB-API will be "stateless" similarly as the >>>>>>>>>> webserver is, so it will be scalable by definition. And yest >>>>>>>>>> performance >>>>>>>>>> tests will be part of POC - likely even before we finally ask for >>>>>>>>>> votes >>>>>>>>>> there. So in short: * no modification or impact on current scheduler >>>>>>>>>> behaviour when DB Isolation is disabled * only higher level methods >>>>>>>>>> will be >>>>>>>>>> moved out to DB-API and we will reuse existing "REST" APIS where it >>>>>>>>>> makes >>>>>>>>>> sense * we aim to have "0" changes to the logic of processing - both >>>>>>>>>> in Dag >>>>>>>>>> Processing logic and DB API. We think with this architecture we >>>>>>>>>> proposed >>>>>>>>>> it's perfectly doable I hope this clarifies a bit, and once we agree >>>>>>>>>> on >>>>>>>>>> general direction, we will definitely work on adding more details and >>>>>>>>>> clarification (we actually already have a lot of that but we just >>>>>>>>>> wanted to >>>>>>>>>> start with explaining the idea and going into more details later >>>>>>>>>> when we >>>>>>>>>> are sure there are no "high-level" blockers from the community. J, >>>>>>>>>> On Thu, >>>>>>>>>> Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote: >>>>>>>>>> > > I just provided a general idea for the approach - but if you >>>>>>>>>> > > want me to >>>>>>>>>> put more examples then I am happy to do that > > > Yes please. > > >>>>>>>>>> It is >>>>>>>>>> too general for me and I can't work out what effect it would >>>>>>>>>> actually have >>>>>>>>>> on the code base, especially how it would look with the config >>>>>>>>>> option to >>>>>>>>>> enable/disable direct db access. > > -ash > > On Thu, Dec 2 2021 at >>>>>>>>>> 16:36:57 +0100, Mateusz Henc <mh...@google.com.INVALID> wrote: > >>>>>>>>>> > Hi, > I am sorry if it is not clear enough, let me try to explain >>>>>>>>>> > it >>>>>>>>>> here, so maybe it gives more light on the idea. > See my comments >>>>>>>>>> below > > >>>>>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org> >>>>>>>>>> wrote: >> >> I'm sorry to say it, but this proposal right just >>>>>>>>>> doesn't >>>>>>>>>> contain enough detail to say what the actual changes to the code >>>>>>>>>> would be, >>>>>>>>>> and what the impact would be >> >> To take the one example you have >>>>>>>>>> so far: >>>>>>>>>> >> >> >> def get_dag_run(self, dag_id, execution_date): >> return >>>>>>>>>> self.db_client.get_dag_run(dag_id,execution_date) >> >> So form this >>>>>>>>>> snippet I'm guessing it would be used like this: >> >> dag_run = >>>>>>>>>> db_client.get_dag_run(dag_id, execution_date) >> >> What type of >>>>>>>>>> object is >>>>>>>>>> returned? > > > As it replaces: > dag_run = session.query(DagRun) > >>>>>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>>>>>>>>> execution_date) > >>>>>>>>>> .first() > > then the type of the object will be exactly the same >>>>>>>>>> (DagRun) >>>>>>>>>> . > >> >> >> Do we need one API method per individual query we have >>>>>>>>>> in the >>>>>>>>>> source? > > > No, as explained by the sentence: > > The method may be >>>>>>>>>> extended, accepting more optional parameters to avoid having too many >>>>>>>>>> similar implementations. > > >> >> >> Which components would use >>>>>>>>>> this new >>>>>>>>>> mode when it's enabled? > > > You may read: > Airflow Database APi >>>>>>>>>> is a new >>>>>>>>>> independent component of Airflow. It allows isolating some components >>>>>>>>>> (Worker, DagProcessor and Triggerer) from direct access to DB. > >> >>>>>>>>>> >> But >>>>>>>>>> what you haven't said the first thing about is what _other_ changes >>>>>>>>>> would >>>>>>>>>> be needed in the code. To take a fairly simple example: >> >> >>>>>>>>>> dag_run = >>>>>>>>>> db_client.get_dag_run(dag_id, execution_date) >> dag_run.queued_at = >>>>>>>>>> timezone.now() >> # How do I save this? >> >> In short, you need to >>>>>>>>>> put a >>>>>>>>>> lot more detail into this before we can even have an idea of the >>>>>>>>>> full scope >>>>>>>>>> of the change this proposal would involve, and what code changes >>>>>>>>>> would be >>>>>>>>>> needed for compnents to work with and without this setting enabled. >>>>>>>>>> > > > >>>>>>>>>> For this particular example - it depends on the intention of the code >>>>>>>>>> author > - If this should be in transaction - then I would actually >>>>>>>>>> introduce new method like enqueue_dag_run(...) that would run these >>>>>>>>>> two >>>>>>>>>> steps on Airflow DB API side > - if not then, maybe just the >>>>>>>>>> "update_dag_run" method accepting the whole "dag_run" object and >>>>>>>>>> saving it >>>>>>>>>> to the DB. > > In general - we could take naive approach, eg replace >>>>>>>>>> code: >>>>>>>>>> > dag_run = session.query(DagRun) > .filter(DagRun.dag_id == dag_id, >>>>>>>>>> DagRun.execution_date == execution_date) > .first() > with: > if >>>>>>>>>> self.db_isolation: > dag_run = session.query(DagRun) > >>>>>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>>>>>>>>> execution_date) > >>>>>>>>>> .first() > else: > dag_run = db_client.get_dag_run(self, dag_id, >>>>>>>>>> execution_date) > > The problem is that Airflow DB API would need to >>>>>>>>>> have >>>>>>>>>> the same implementation for the query - so duplicated code. That's >>>>>>>>>> why we >>>>>>>>>> propose moving this code to the DBClient which is also used by the >>>>>>>>>> Airflow >>>>>>>>>> DB API(in DB direct mode). > > I know there are many places where >>>>>>>>>> the code >>>>>>>>>> is much more complicated than a single query, but they must be >>>>>>>>>> handled >>>>>>>>>> one-by-one, during the implementation, otherwise this AIP would be >>>>>>>>>> way too >>>>>>>>>> big. > > I just provided a general idea for the approach - but if >>>>>>>>>> you want >>>>>>>>>> me to put more examples then I am happy to do that > > Best regards, >>>>>>>>>> > >>>>>>>>>> Mateusz Henc > >> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz >>>>>>>>>> Henc >>>>>>>>>> <mh...@google.com.INVALID> wrote: >> >> Hi, >> I just added a >>>>>>>>>> new AIP for running some Airflow components in DB-isolation mode, >>>>>>>>>> without >>>>>>>>>> direct access to the Airflow Database, but they will use a new API >>>>>>>>>> for thi >>>>>>>>>> purpose. >> >> PTAL: >> >>>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API >>>>>>>>>> >> >> Open question: >> I called it "Airflow Database API" - however >>>>>>>>>> >> >> I feel >>>>>>>>>> it could be more than just an access layer for the database. So if >>>>>>>>>> you have >>>>>>>>>> a better name, please let me know, I am happy to change it. >> >> >>>>>>>>>> Best >>>>>>>>>> regards, >> Mateusz Henc >>>>>>>>>> >>>>>>>>>