Hi Jarek,

Thanks for the thorough explanation. They all make sense, especially the
performance impact and scalability part. I believe with the internal API,
it should reduce lots of db connections.

Looking forward to it!

Thanks,

Ping


On Thu, Aug 4, 2022 at 12:11 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> I would like to see more about how you do this because I have noticed that
> there are tons of different configurations in the airflow and some
> combinations are less covered by the test as there are some default values,
> for example this code
> <https://github.com/apache/airflow/blob/3c08cefdfd2e2636a714bb835902f0cb34225563/airflow/task/task_runner/standard_task_runner.py#L39-L42>
> .
>
> > Good point. I plan to have another "test type" added in our CI and run
> the full test suite there. I am not planning to add new tests (well there
> will be some generic DB-less mode tests for sure but I am mostly going to
> rely on the existing test harness for Airflow). The change will be that I
> will just block all direct DB access from any methods that will be executed
> via any of the "airflow" main code. This should be rather easy - since we
> are using SQLAlchemy, we will be able to capture all DB requests and see if
> they are coming from "Airflow" or from "Tests" (looking at call stack
> trace). This should allow us to have a "DB isolation" mode where (vast)
> majority of the current tests will be able to run without any modifications
> but we will fail if any of the "airflow" code accesses the DB directly.
> Likely some tests will need to be adapted or not run in the DB Isolation
> mode, but this should be easy as well.
>
> This way we will be able to not only to fix any "unwanted" DB access
> currently (for example if any of the provider code accesses DB directly we
> will see tests failing). This will also be future-compatible, i.e. new
> tests or changes in airflow will also have to pass the tests so there is
> very little overhead needed in case of future airflow code. We are not
> making anything but possibly minor modifications to the internal Airflow
> structure or methods/class structure and except adding the need to also
> keep .proto definitions for the Internal APIs (maybe 30-40 methods in
> total)  there is no extra overhead involved to develop new features or
> modify the existing ones.
>
> > Also, it will be great if your AIP can discuss the long term strategy
> about the internal API. Personally, I think we should completely remove the
> db session access from the worker and other untrusted components and fully
> rely on the Internal API. This will reduce the maintenance burden as if we
> add new db access in those untrusted components, we need to do it for both
> db session and internal API. What's more, with the Internal API, we can
> even remove the db-proxy :)
>
> Maybe that was not clear enough but this is precisely what AIP-44 is
> about! We are going to remove ALL Direct DB acess from those untrusted
> components. ALL of it. Full stop. The (not even long term) goal is that
> when AIP-44 is implemented, untrusted components will not need Metadata DB
> access at all. We are going to fully rely on the Internal API :). I think
> the way I proposed it, with the help of a few interested people, we should
> be able to get there in Airflow 2.5 (looking at the inventory of methods).
> I will need to update the inventory after 2.4 is out with the dataset
> changes but I do not expect particular increase in the scope. The steps
> that need to be done to complete it:
>
> * Implement the foundation ->  I plan to start working on immediately
> after approval): complete the POC and implement the test harness for our CI
> (should be rather easy )
> * Redo the inventory and prepare list of methods  most of the changes is
> to implement ".proto" for all the method to replace -> right after we cut
> 2.4 branch
> * Replace the methods and make sure that all the tests pass -> once we
> have foundation in place this is easy to be done by a number of people,
> similarly as we do AIP-47 where we have a number of people implementing
> parts of it.
> * overall testing including performance impact and scalability (which I
> had initially agreed that Google Composer Team could help a lot for "scale"
> testing).
>
> If I have support from the community members to help with the last point,
> I think we should be rather safe to get it implemented for 2.5 (or 2.6 if
> the final testing will reveal some optimisations needs).
>
> This will all be under feature-flag and the idea is that without the flag
> enabled there should be nearly 0 impact on the existing functionality
> (there will be very slight overhead to make an extra method call  when any
> DB method starts)  - but since those are "DB" methods, it will be totally
> neglectible..
>
> J.
>
>
> On Thu, Aug 4, 2022 at 12:47 AM Ping Zhang <pin...@umich.edu> wrote:
>
>>
>> Hi Jarek,
>>
>> Thanks for pushing forward with this AIP.
>>
>> I am very interesting in this part:
>>
>> All tests for DagProcessor, Workers (including Operators) and triggers
>> are executed in both modes "DBDirect'' and "DBIsolation ''. The code for
>> Airflow Database API, the DbApiClient and DBSession are extended until all
>> tests pass.
>>
>> I would like to see more about how you do this because I have noticed
>> that there are tons of different configurations in the airflow and some
>> combinations are less covered by the test as there are some default values,
>> for example this code
>> <https://github.com/apache/airflow/blob/3c08cefdfd2e2636a714bb835902f0cb34225563/airflow/task/task_runner/standard_task_runner.py#L39-L42>
>> .
>>
>> Also, it will be great if your AIP can discuss the long term strategy
>> about the internal API. Personally, I think we should completely remove the
>> db session access from the worker and other untrusted components and fully
>> rely on the Internal API. This will reduce the maintenance burden as if we
>> add new db access in those untrusted components, we need to do it for both
>> db session and internal API. What's more, with the Internal API, we can
>> even remove the db-proxy :)
>>
>> Thanks,
>>
>> Ping
>>
>>
>> On Tue, Aug 2, 2022 at 6:56 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> Hello,
>>>
>>> I have updated AIP-44 with the findings from the POC. If there are any
>>> more questions/comments, please let me know, otherwise I plan to start
>>> voting tomorrow.
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>
>>> On Mon, Jul 25, 2022 at 10:18 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>>> I think the drop will be less than those numbers - especially the
>>>> high-end ones. The DB operations we use are quite "coarse" grained (they
>>>> span a single transaction, not a function call). Also I deliberately chose
>>>> "example dags" as the benchmark playground - the example DAGs we have are
>>>> small. And in this case it means that the "actual" work done is rather
>>>> small. If you start parsing bigger DAGs, the overall number of calls (thus
>>>> RPC overhead) will not grow, but the time needed to parse the DAG will grow
>>>> a lot. This means that the RPC overhead will get way smaller
>>>> (percentage-wise).
>>>>
>>>> Also this is not an "overall" performance drop - this is really a
>>>> "mini-benchmark" (as opposed to "system benchmark" and "micro-benchmark").
>>>> The benchmark only  concerns running particular "transactions" - but all
>>>> the other parts remain unaffected - so the numbers I saw are the absolute,
>>>> maximum limits of the performance drop and they will be much smaller in
>>>> reality.
>>>>
>>>> J.
>>>>
>>>>
>>>> On Mon, Jul 25, 2022 at 9:01 PM Mateusz Henc <mh...@google.com.invalid>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Sorry, but I was OOO and offline last week and I was not able to even
>>>>> rely.
>>>>>
>>>>> Thank you Jarek for a very detailed analysis. I think we all expected
>>>>> some performance drop for better security.
>>>>> Do you think the performance drop that you measured is what we should
>>>>> expect in the "final" version? I don't know Airflow that much to know 
>>>>> which
>>>>> DB- access is used most (and what object types are required there -
>>>>> small/big or maybe just primitives?).
>>>>>
>>>>> Nonetheless, I am looking forward to starting the voting process!
>>>>>
>>>>> Best regards,
>>>>> Mateusz Henc
>>>>>
>>>>>
>>>>> On Fri, Jul 15, 2022 at 4:48 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>>>
>>>>>> Two more things:
>>>>>> 1) there is one "caveat" I had to handle - timeout handling in
>>>>>> DagFileProcessor (signals do not play well with threads of GRPC - but 
>>>>>> that
>>>>>> should be easy to fix)
>>>>>> 2) The way I proposed it (with very localized changes needed)  will
>>>>>> be very easy to split the job among multiple people and make it a true
>>>>>> community effort to complete - no need for major refactorings or changes
>>>>>> across the whole airflow code.
>>>>>>
>>>>>> J.
>>>>>>
>>>>>> On Fri, Jul 15, 2022 at 4:32 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello Everyone,
>>>>>>>
>>>>>>> First of all - apologies for those who waited for it, I've been
>>>>>>> dragged in multiple directions, but finally I got some quality time to 
>>>>>>> take
>>>>>>> a look at open questions and implement POC for AIP-44 as promised 
>>>>>>> before.
>>>>>>>
>>>>>>> TL;DR; I finally came back to the AIP-44 and multi-tenancy and have
>>>>>>> made good progress that hopefully will lead to voting next week. I 
>>>>>>> think I
>>>>>>> have enough of the missing evidence of the impact of the internal API on
>>>>>>> performance, also I have implemented working POC with one of the 
>>>>>>> Internal
>>>>>>> API calls (processFile) that we will have to implement and run a series 
>>>>>>> of
>>>>>>> performance tests with it.
>>>>>>>
>>>>>>> # Current state
>>>>>>> --------------------
>>>>>>>
>>>>>>> The state we left it few months ago was (
>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>>>>> ):
>>>>>>> * I've prepared inventory of methods and general approach we are
>>>>>>> going to take (and we got consensus there)
>>>>>>> * I've left decision on the final choice (Thrift vs. GRPC) to later
>>>>>>> POC (I got some feedback from Beam about gRPC vs. Thrift, based on that 
>>>>>>> I
>>>>>>> started with GRPC and I am actually very happy with it, so i think we 
>>>>>>> can
>>>>>>> leave Thrift out of the picture).
>>>>>>> * We've struggled a bit with the decision - should we leave two
>>>>>>> paths (with GRPC/Direct DB) or one (local GRPC vs. remote GRPC)
>>>>>>>
>>>>>>> The POC is implemented here:
>>>>>>> https://github.com/apache/airflow/pull/25094 and I have the
>>>>>>> following findings:
>>>>>>>
>>>>>>> # Performance impact
>>>>>>> ------------------------------
>>>>>>>
>>>>>>> The performance impact is visible (as expected). It's quite
>>>>>>> acceptable for a distributed environment (in exchange for security), but
>>>>>>> it's likely significant enough to stay with the original idea of having
>>>>>>> Airflow work in two modes: a) secure - with RPC apis, b) standard - with
>>>>>>> direct DB calls.
>>>>>>>
>>>>>>> On "localhost" with a local DB the performance overhead for
>>>>>>> serializing and transporting the messages between two different 
>>>>>>> processes
>>>>>>> introduced up to 10% overhead. I saw an increase of time to run 500 
>>>>>>> scans
>>>>>>> of all our example folders dags going from ~290 to ~320s pretty
>>>>>>> consistently. Enough of a difference to exclude volatility. I tested it 
>>>>>>> in
>>>>>>> Docker on both ARM and AMD. It seems that in some cases that can be
>>>>>>> partially  offset by slightly increased parallelism on multi-processing
>>>>>>> machines run on "bare" metal. I got consistently just 2-3% increase on 
>>>>>>> my
>>>>>>> linux without docker with the same test harness, but we cannot rely on 
>>>>>>> such
>>>>>>> configurations, I think focusing on Docker-based installation without 
>>>>>>> any
>>>>>>> special assumptions on how your networking/localhost is implemented is
>>>>>>> something we should treat as a baseline.
>>>>>>>
>>>>>>> The same experiment repeated with 50 messages but of much bigger
>>>>>>> size (300 Callbacks passed per message) have shown 30% performance drop.
>>>>>>> This is significant, but I believe most of our messages will be much
>>>>>>> smaller. Also the messages I chose were very special - because in case 
>>>>>>> of
>>>>>>> Callback we are doing rather sophisticated serialization, because we 
>>>>>>> have
>>>>>>> to account for Kubernetes objects that are potentially not 
>>>>>>> serializable, so
>>>>>>> the test involved 300 messages that had to be not only GRPC serialized 
>>>>>>> but
>>>>>>> also some parts of them had to be Airflow-JSON serialized and the whole
>>>>>>> "executor config" had to be picked/unpickled (for 100 such messages out 
>>>>>>> of
>>>>>>> 300). Also we have some conditional processing there (there is a union 
>>>>>>> of
>>>>>>> three types of callbacks that has to be properly deserialized). And 
>>>>>>> those
>>>>>>> messages could be repeated. This is not happening for most other cases. 
>>>>>>> And
>>>>>>> we can likely optimize it away in the future. but I wanted to see the
>>>>>>> "worst" scenario.
>>>>>>>
>>>>>>> For the remote client (still local DB for the internal API server) I
>>>>>>> got between 30% and 80% slow-down, but this was on my WiFi network. I am
>>>>>>> sure with a "wired" network, it will be much better, also when a remote 
>>>>>>> DB
>>>>>>> gets into picture the overall percentage overhead will be much smaller. 
>>>>>>> We
>>>>>>> knew this would be a "slower" solution but this is the price for someone
>>>>>>> who wants to have security and isolation.
>>>>>>>
>>>>>>> I have not yet performed the tests with SSL, but I think this will
>>>>>>> be a modest increase. And anyhow the "impact" of 10% is IMHO enough to 
>>>>>>> make
>>>>>>> a decision that we cannot get "GRPC-only" - the path where we will 
>>>>>>> continue
>>>>>>> using Direct DB access should stay (and I know already how to do it
>>>>>>> relatively easily).
>>>>>>>
>>>>>>> # Implementation and maintainability
>>>>>>> -------------------------------------------------
>>>>>>>
>>>>>>> In the PR you will see the way I see  implementation details and how
>>>>>>> it will impact our code in general.  I think what I proposed is actually
>>>>>>> rather elegant and easy to maintain, and likely we can improve it 
>>>>>>> somehow
>>>>>>> (happy to brainstorm on some creative ways we could use - for example -
>>>>>>> decorators) to make it "friendler" but I focused more on explicitness 
>>>>>>> and
>>>>>>> showing the mechanisms involved rather than "fanciness". I found it 
>>>>>>> rather
>>>>>>> easy to implement and it does seem to have some good "easy 
>>>>>>> maintainability"
>>>>>>> properties. The way I propose it boils down to few "rules":
>>>>>>>
>>>>>>> * For all the data and "Messages" we sent, we have a nice "Internal
>>>>>>> API" defined in .proto and protobuf objects generated out of that. The 
>>>>>>> GRPC
>>>>>>> proto nicely defines the structures we are going to send over the 
>>>>>>> network.
>>>>>>> It's very standard, it has a really good modern support. For one, we
>>>>>>> automatically - I added pre-commit - generate MyPY type stubs for the
>>>>>>> generated classes and it makes it super easy to both - implement the
>>>>>>> mapping and automatically verify its correctness. Mypy nicely catches 
>>>>>>> all
>>>>>>> kinds of mistakes you can make)! Autocomplete for the PROTO-generated
>>>>>>> classes works like a charm. I struggled initially without typing, but 
>>>>>>> once
>>>>>>> I configured mypy stub generation, I got really nice detection of 
>>>>>>> mistakes
>>>>>>> I've made and I was able to progress with the implementation way faster
>>>>>>> than without it. Usually everything magically worked as soon as I fixed 
>>>>>>> all
>>>>>>> MyPy errors.
>>>>>>>
>>>>>>> * All Airflow objects that we get to send over GRPC should get
>>>>>>> from_protobuf/to_protobuf methods. They are usually very simple (just
>>>>>>> passing strings/ints/boolean fields to the constructor), and the 
>>>>>>> structures
>>>>>>> we pass are rather small (see the PR). Also (as mentioned above) I
>>>>>>> implemented a few more complex cases (with more complex serialization) 
>>>>>>> and
>>>>>>> it is easy, readable and pretty well integrated into our code IMHO. This
>>>>>>> introduces a little duplication here and there, but those objects change
>>>>>>> rarely (only when we implement big features like Dynamic Task mapping) 
>>>>>>> and
>>>>>>> MyPy guards us against any mishaps there (now that our code is mostly
>>>>>>> typed).
>>>>>>>
>>>>>>> * We make all "DB Aware" objects also "GRPC aware". The number of
>>>>>>> changes in the actual code/logic is rather small and makes it super 
>>>>>>> easy to
>>>>>>> maintain IMHO. There are two changes:
>>>>>>>
>>>>>>>     1) for any of the objects that are used for database operations
>>>>>>> (in my PR this is DagFileProcessor) we need to initialize it with the
>>>>>>> "use_grpc" flag and pass it a channel that will be used for 
>>>>>>> communication.
>>>>>>>     2) the DB methods we have (inventory in the AIP) will have to be
>>>>>>> refactored slightly. This is what really was added, the original
>>>>>>> "process_file" method was renamed to "process_file_db" and the 
>>>>>>> "callers" of
>>>>>>> the method are completely intact.
>>>>>>>
>>>>>>>     def process_file_grpc(
>>>>>>>         self,
>>>>>>>         file_path: str,
>>>>>>>         callback_requests: List[CallbackRequest],
>>>>>>>         pickle_dags: bool = False,
>>>>>>>     ) -> Tuple[int, int]:
>>>>>>>         request =
>>>>>>> internal_api_pb2.FileProcessorRequest(path=file_path,
>>>>>>> pickle_dags=pickle_dags)
>>>>>>>         for callback_request in callback_requests:
>>>>>>>             request.callbacks.append(callback_request.to_protobuf())
>>>>>>>         res = self.stub.processFile(request)
>>>>>>>         return res.dagsFound, res.errorsFound
>>>>>>>
>>>>>>>     def process_file(
>>>>>>>         self,
>>>>>>>         file_path: str,
>>>>>>>         callback_requests: List[CallbackRequest],
>>>>>>>         pickle_dags: bool = False,
>>>>>>>     ) -> Tuple[int, int]:
>>>>>>>         if self.use_grpc:
>>>>>>>             return self.process_file_grpc(
>>>>>>>                 file_path=file_path,
>>>>>>> callback_requests=callback_requests, pickle_dags=pickle_dags
>>>>>>>             )
>>>>>>>         return self.process_file_db(
>>>>>>>             file_path=file_path,
>>>>>>> callback_requests=callback_requests, pickle_dags=pickle_dags
>>>>>>>         )
>>>>>>>
>>>>>>> You can see all the details in the PR
>>>>>>> https://github.com/apache/airflow/pull/25094.
>>>>>>>
>>>>>>> # POC testing harness (self-service :) )
>>>>>>> ----------------------------------------------------
>>>>>>>
>>>>>>> You can also very easily test it yourself:
>>>>>>>
>>>>>>> * airflow internal-api server -> runs internal API server.
>>>>>>> * airflow internal-api test-client --num-repeats 10 --num-callbacks
>>>>>>> 10 --use-grpc -> runs file-processing of all example dags 10 times with
>>>>>>> 10x3 callbacks sent. Same command without --use-grpc will run using 
>>>>>>> direct
>>>>>>> DB access. Server listens on "50051" port, client connects to
>>>>>>> "localhost:50051" - you can modify the code to use remote IP/different
>>>>>>> ports (easy to find by localhost:50051).
>>>>>>>
>>>>>>> # Discussion and voting
>>>>>>> --------------------------------
>>>>>>>
>>>>>>> Of course some decisions in the PR can be improved (I focused on
>>>>>>> explicitness of the POC more than anything else). We can discuss some
>>>>>>> changes and improvements to some decisions I made when the PR will be in
>>>>>>> "reviewable state" and I am happy to improve it, but for now I would 
>>>>>>> like
>>>>>>> to focus on answering the two questions:
>>>>>>>
>>>>>>> * Does it look plausible?
>>>>>>> * Does it look like it's almost ready to vote on it? (I will update
>>>>>>> the AIP before starting voting, of course).
>>>>>>>
>>>>>>> Let me know what you think.
>>>>>>>
>>>>>>> J.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 1, 2022 at 3:11 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Since we have AIP-43 already approved, I think I would love to have
>>>>>>>> more questions and discussions about AIP-44 - The "Airflow Internal 
>>>>>>>> API"
>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>>>>>> (as the new name is).
>>>>>>>>
>>>>>>>> For those who would like to get more context - recording of the
>>>>>>>> meeting where both AIP-43 and AIP-44 scope and proposals were 
>>>>>>>> discussed can
>>>>>>>> be found here:
>>>>>>>> https://drive.google.com/file/d/1SMFzazuY1kg4B4r11wNt8EQ_PmTDRKq6/view
>>>>>>>>
>>>>>>>> In the AIP I just made a small clarification regarding some
>>>>>>>> "future" changes  - specifically the token security might be nicely 
>>>>>>>> handled
>>>>>>>> together with AIP-46 "Add support for docker runtime isolation" that is
>>>>>>>> proposed by Ping.
>>>>>>>>
>>>>>>>> My goal is to gather comments till the end of the week, and if
>>>>>>>> there will be no big concerns, I would love to start voting next week.
>>>>>>>>
>>>>>>>> J.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jan 3, 2022 at 2:48 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Also AIP-44 - which is the DB isolation mode is much more detailed
>>>>>>>>> and
>>>>>>>>> ready for deeper discussion if needed.
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API
>>>>>>>>>
>>>>>>>>> On Tue, Dec 14, 2021 at 12:07 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>>>>> wrote:
>>>>>>>>> >
>>>>>>>>> > And just to add to that.
>>>>>>>>> >
>>>>>>>>> > Thanks again for the initial comments and pushing us to provide
>>>>>>>>> more
>>>>>>>>> > details. That allowed us to discuss and focus on many of the
>>>>>>>>> aspects
>>>>>>>>> > that were raised and we have many more answers now:
>>>>>>>>> >
>>>>>>>>> > * first of all - we focused on making sure impact on the
>>>>>>>>> existing code
>>>>>>>>> > and "behavior" of Airflow is minimal. In fact there should be
>>>>>>>>> > virtually no change vs. current behavior when db isolation is
>>>>>>>>> > disabled.
>>>>>>>>> > * secondly - we've done a full inventory of how the API needed
>>>>>>>>> should
>>>>>>>>> > look like and (not unsurprisingly) it turned out that the current
>>>>>>>>> > REST-style API is good for part of it but most of the 'logic" of
>>>>>>>>> > airflow can be done efficiently when we go to RPC-style API.
>>>>>>>>> However
>>>>>>>>> > we propose that the authorization/exposure of the API is the
>>>>>>>>> same as
>>>>>>>>> > we use currently in the REST API, this will allow us to reuse a
>>>>>>>>> big
>>>>>>>>> > part of the infrastructure we already have
>>>>>>>>> > * thirdly - we took deep into our hearts the comments about
>>>>>>>>> having to
>>>>>>>>> > maintain pretty much the same logic in a few different places.
>>>>>>>>> That's
>>>>>>>>> > an obvious maintenance problem. The proposal we came up with
>>>>>>>>> addresses
>>>>>>>>> > it - we are going to keep the logic of Airflow internals in one
>>>>>>>>> place
>>>>>>>>> > only and we will simply smartly route on where the logic will be
>>>>>>>>> > executed
>>>>>>>>> > * regarding the performance impact - we described the deployment
>>>>>>>>> > options that our proposal makes available - we do not want to
>>>>>>>>> favor
>>>>>>>>> > one deployment option over another, but we made sure the
>>>>>>>>> architecture
>>>>>>>>> > is done in the way, that you can choose which deployment is good
>>>>>>>>> for
>>>>>>>>> > you: "no isolation", "partial isolation", "full isolation" -
>>>>>>>>> each with
>>>>>>>>> > different performance/resource characteristics - all of them
>>>>>>>>> fully
>>>>>>>>> > horizontally scalable and nicely manageable.
>>>>>>>>> >
>>>>>>>>> > We look forward to comments, also for the API 43 -
>>>>>>>>> >
>>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
>>>>>>>>> > - AIP-43 is a prerequiste to AIP-44 and they both work together.
>>>>>>>>> >
>>>>>>>>> > We also will think and discuss more follow-up AIPs once we get
>>>>>>>>> those
>>>>>>>>> > approved (hopefully ;) ). The multi-tenancy is a 'long haul" and
>>>>>>>>> while
>>>>>>>>> > those two AIPs are foundational building blocks, there are at
>>>>>>>>> least
>>>>>>>>> > few more follow-up AIPs to be able to say "we're done with
>>>>>>>>> > Multi-tenancy" :).
>>>>>>>>> >
>>>>>>>>> > J.
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> > On Tue, Dec 14, 2021 at 9:58 AM Mateusz Henc
>>>>>>>>> <mh...@google.com.invalid> wrote:
>>>>>>>>> > >
>>>>>>>>> > > Hi,
>>>>>>>>> > >
>>>>>>>>> > > As promised we (credits to Jarek) updated the AIPs, added more
>>>>>>>>> details, did inventory and changed the way API endpoints are 
>>>>>>>>> generated.
>>>>>>>>> > >
>>>>>>>>> > > We also renamed it to Airflow Internal API - so the url has
>>>>>>>>> changed:
>>>>>>>>> > >
>>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>>>>>>> > >
>>>>>>>>> > > Please take a look, any comments are highly appreciated.
>>>>>>>>> > >
>>>>>>>>> > > Best regards,
>>>>>>>>> > > Mateusz Henc
>>>>>>>>> > >
>>>>>>>>> > >
>>>>>>>>> > > On Mon, Dec 6, 2021 at 3:09 PM Mateusz Henc <mh...@google.com>
>>>>>>>>> wrote:
>>>>>>>>> > >>
>>>>>>>>> > >> Hi,
>>>>>>>>> > >> Thank you Ash for your feedback (in both AIPs)
>>>>>>>>> > >>
>>>>>>>>> > >> We are working to address your concerns. We will update the
>>>>>>>>> AIPs in a few days.
>>>>>>>>> > >> I will let you know when it's done.
>>>>>>>>> > >>
>>>>>>>>> > >> Best regards,
>>>>>>>>> > >> Mateusz Henc
>>>>>>>>> > >>
>>>>>>>>> > >>
>>>>>>>>> > >> On Fri, Dec 3, 2021 at 7:27 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>>>>> wrote:
>>>>>>>>> > >>>
>>>>>>>>> > >>> Cool. Thanks for the guidance.
>>>>>>>>> > >>>
>>>>>>>>> > >>> On Fri, Dec 3, 2021 at 6:37 PM Ash Berlin-Taylor <
>>>>>>>>> a...@apache.org> wrote:
>>>>>>>>> > >>> >
>>>>>>>>> > >>> > - make an inventory: It doesn't need to be exhaustive, but
>>>>>>>>> a representative sample.
>>>>>>>>> > >>> > - More clearly define _what_ the API calls return --
>>>>>>>>> object type, methods on them etc.
>>>>>>>>> > >>> >
>>>>>>>>> > >>> > From the AIP you have this example:
>>>>>>>>> > >>> >
>>>>>>>>> > >>> >   def get_dag_run(self, dag_id, execution_date):
>>>>>>>>> > >>> >     return
>>>>>>>>> self.db_client.get_dag_run(dag_id,execution_date)
>>>>>>>>> > >>> >
>>>>>>>>> > >>> > What does that _actually_ return? What capabilities does
>>>>>>>>> it have?
>>>>>>>>> > >>> >
>>>>>>>>> > >>> > (I have other thoughts but those are less fundamental and
>>>>>>>>> can be discussed later)
>>>>>>>>> > >>> >
>>>>>>>>> > >>> > -ash
>>>>>>>>> > >>> >
>>>>>>>>> > >>> > On Fri, Dec 3 2021 at 18:20:21 +0100, Jarek Potiuk <
>>>>>>>>> ja...@potiuk.com> wrote:
>>>>>>>>> > >>> >
>>>>>>>>> > >>> > Surely - if you think that we need to do some more work to
>>>>>>>>> get confidence, that's fine. I am sure we can improve it to the level 
>>>>>>>>> that
>>>>>>>>> we will not have to do full performance tests, and you are confident 
>>>>>>>>> in the
>>>>>>>>> direction. Just to clarify your concerns and make sure we are on the 
>>>>>>>>> same
>>>>>>>>> page - as I understand it should: * make an inventory of "actual 
>>>>>>>>> changes"
>>>>>>>>> the proposal will involve in the database-low-level code of Airflow * 
>>>>>>>>> based
>>>>>>>>> on that either assessment that those changes are unlikely (or likely 
>>>>>>>>> make a
>>>>>>>>> performance impact) * if we asses that it is likely to have an 
>>>>>>>>> impact, some
>>>>>>>>> at least rudimentary performance tests to prove that this is 
>>>>>>>>> manageable I
>>>>>>>>> think that might be a good exercise to do. Does it sound about right? 
>>>>>>>>> Or do
>>>>>>>>> you have any concerns about certain architectural decisions taken? No
>>>>>>>>> problem with Friday, but if we get answers today. I think it will 
>>>>>>>>> give us
>>>>>>>>> time to think about it over the weekend and address it next week. J. 
>>>>>>>>> On
>>>>>>>>> Fri, Dec 3, 2021 at 5:56 PM Ash Berlin-Taylor <a...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> > >>> >
>>>>>>>>> > >>> > This is a fundamental change to the architecture with
>>>>>>>>> significant possible impacts on performance, and likely requires 
>>>>>>>>> touching a
>>>>>>>>> large portion of the code base. Sorry, you're going to have to do 
>>>>>>>>> expand on
>>>>>>>>> the details first and work out what would actually be involved and 
>>>>>>>>> what the
>>>>>>>>> impacts will be: Right now I have serious reservations to this 
>>>>>>>>> approach, so
>>>>>>>>> I can't agree on the high level proposal without an actual proposal 
>>>>>>>>> (The
>>>>>>>>> current document is, at best, an outline, not an actual proposal.) 
>>>>>>>>> Sorry to
>>>>>>>>> be a grinch right before the weekend. Ash On Thu, Dec 2 2021 at 
>>>>>>>>> 22:47:34
>>>>>>>>> +0100, Jarek Potiuk <ja...@potiuk.com> wrote: Oh yeah - good
>>>>>>>>> point and we spoke about performance testing/implications. 
>>>>>>>>> Performance is
>>>>>>>>> something we were discussing as the next step when we get general 
>>>>>>>>> "OK" in
>>>>>>>>> the direction - we just want to make sure that there are no "huge" 
>>>>>>>>> blockers
>>>>>>>>> in the way this is proposed and explain any doubts first, so that the
>>>>>>>>> investment in performance part makes sense. We do not want to spend a 
>>>>>>>>> lot
>>>>>>>>> of time on getting the tests done and detailed inventory of methods/ 
>>>>>>>>> API
>>>>>>>>> calls to get - only to find out that this is generally "bad 
>>>>>>>>> direction".
>>>>>>>>> Just to clarify again - we also considered (alternative option) to
>>>>>>>>> automatically map all the DB methods in the remote calls. But we 
>>>>>>>>> dropped
>>>>>>>>> that idea - precisely for the reason of performance, and transaction
>>>>>>>>> integrity. So we are NOT mapping DB calls into API calls. those will 
>>>>>>>>> be
>>>>>>>>> "logical operations" on the database. Generally speaking, most of the 
>>>>>>>>> API
>>>>>>>>> calls for the "airflow system-level but executed in worker" calls 
>>>>>>>>> will be
>>>>>>>>> rather "coarse" than fine-grained. For example, the aforementioned 
>>>>>>>>> "mini
>>>>>>>>> scheduler" - where we want to make a single API call and run the 
>>>>>>>>> whole of
>>>>>>>>> it on the DBAPI side. So there - performance impact is very limited 
>>>>>>>>> IMHO.
>>>>>>>>> And If we see any other "logic" like that in other parts of the code
>>>>>>>>> (zombie detection as an example). We plan to make a detailed 
>>>>>>>>> inventory of
>>>>>>>>> those once we get general "Looks good" for the direction. For now we 
>>>>>>>>> did
>>>>>>>>> some "rough" checking and it seems a plausible approach and quite 
>>>>>>>>> doable.
>>>>>>>>> One more note - the "fine-grained" ( "variable" update/retrieval,
>>>>>>>>> "connection update retrieval") - via REST API will still be used by 
>>>>>>>>> the
>>>>>>>>> user's code though (Parsing DAGs, operators, workers and callbacks). 
>>>>>>>>> We
>>>>>>>>> also plan to make sure that none of the "Community" operators are 
>>>>>>>>> using
>>>>>>>>> "non-blessed" DB calls (we can do it in our CI). So at the end of the
>>>>>>>>> exercise, all operators, hooks, etc. from the community will be 
>>>>>>>>> guaranteed
>>>>>>>>> to only use the DB APIs that are available in the "DB API" module. But
>>>>>>>>> there I do not expect pretty much any performance penalty as those 
>>>>>>>>> are very
>>>>>>>>> fast and rare operations (and good thing there is that we can cache 
>>>>>>>>> results
>>>>>>>>> of those in workers/DAG processing). J. On Thu, Dec 2, 2021 at 7:16 PM
>>>>>>>>> Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: Ah, my
>>>>>>>>> bad, I missed that. I'd still like to see discussion of the 
>>>>>>>>> performance
>>>>>>>>> impacts, though. On Thu, Dec 2, 2021 at 11:14 AM Ash Berlin-Taylor <
>>>>>>>>> a...@apache.org> wrote: The scheduler was excluded from the
>>>>>>>>> components that would use the dbapi - the mini scheduler is the odd 
>>>>>>>>> one out
>>>>>>>>> here is it (currently) runs on the work but shares much of the code 
>>>>>>>>> from
>>>>>>>>> the scheduling path. -a On 2 December 2021 17:56:40 GMT, Andrew 
>>>>>>>>> Godwin <
>>>>>>>>> andrew.god...@astronomer.io.INVALID> wrote: I would also like to
>>>>>>>>> see some discussion in this AIP about how the data is going to be
>>>>>>>>> serialised to and from the database instances (obviously Connexion is
>>>>>>>>> involved, but I presume more transformation code is needed than that) 
>>>>>>>>> and
>>>>>>>>> the potential slowdown this would cause. In my experience, a somewhat
>>>>>>>>> direct ORM mapping like this is going to result in considerably slower
>>>>>>>>> times for any complex operation that's touching a few hundred rows. Is
>>>>>>>>> there a reason this is being proposed for the scheduler code, too? In 
>>>>>>>>> my
>>>>>>>>> mind, the best approach to multitenancy would be to remove all
>>>>>>>>> user-supplied code from the scheduler and leave it with direct DB 
>>>>>>>>> access,
>>>>>>>>> rather than trying to indirect all scheduler access through another 
>>>>>>>>> API
>>>>>>>>> layer. Andrew On Thu, Dec 2, 2021 at 10:29 AM Jarek Potiuk <
>>>>>>>>> ja...@potiuk.com> wrote: Yeah - I thik Ash you are completely
>>>>>>>>> right we need some more "detailed" clarification. I believe, I know 
>>>>>>>>> what
>>>>>>>>> you are - rightfully - afraid of (re impact on the code), and maybe 
>>>>>>>>> we have
>>>>>>>>> not done a good job on explaining it with some of our assumptions we 
>>>>>>>>> had
>>>>>>>>> when we worked on it with Mateusz. Simply it was not clear that our 
>>>>>>>>> aim is
>>>>>>>>> to absolutely minimise the impact on the "internal DB transactions" 
>>>>>>>>> done in
>>>>>>>>> schedulers and workers. The idea is that change will at most result in
>>>>>>>>> moving an execution of the transactions to another process but not 
>>>>>>>>> changing
>>>>>>>>> what the DB transactions do internally. Actually this was one of the 
>>>>>>>>> reason
>>>>>>>>> for the "alternative" approach (you can see it in the document) we
>>>>>>>>> discussed about - hijack "sqlalchemy session" - this is far too low 
>>>>>>>>> level
>>>>>>>>> and the aim of the "DB-API" is NOT to replace direct DB calls (Hence 
>>>>>>>>> we
>>>>>>>>> need to figure out a better name). The API is there to provide 
>>>>>>>>> "scheduler
>>>>>>>>> logic" API and "REST access to Airflow primitives like
>>>>>>>>> dags/tasks/variables/connections" etc.. As an example (which we 
>>>>>>>>> briefly
>>>>>>>>> talked about in slack) the "_run_mini_scheduler_on_child_tasks" case (
>>>>>>>>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274)
>>>>>>>>> is an example (that we would put in the doc). As we thought of it - 
>>>>>>>>> this is
>>>>>>>>> a "single DB-API operation". Those are not Pure REST calls of course, 
>>>>>>>>> they
>>>>>>>>> are more RPC-like calls. That is why even initially I thought of 
>>>>>>>>> separating
>>>>>>>>> the API completely. But since there are a lot of common "primitive" 
>>>>>>>>> calls
>>>>>>>>> that we can re-use, I think having a separate DB-API component which 
>>>>>>>>> will
>>>>>>>>> re-use connexion implementation, replacing authentication with the 
>>>>>>>>> custom
>>>>>>>>> worker <> DB-API authentication is the way to go. And yes if we agree 
>>>>>>>>> on
>>>>>>>>> the general idea, we need to choose the best way on how to best 
>>>>>>>>> "connect"
>>>>>>>>> the REST API we have with the RPC-kind of API we need for some cases 
>>>>>>>>> in
>>>>>>>>> workers. But we wanted to make sure we are on the same page with the
>>>>>>>>> direction. And yes it means that DB-API will potentially have to 
>>>>>>>>> handle
>>>>>>>>> quite a number of DB operations (and that it has to be replicable and
>>>>>>>>> scalable as well) - but DB-API will be "stateless" similarly as the
>>>>>>>>> webserver is, so it will be scalable by definition. And yest 
>>>>>>>>> performance
>>>>>>>>> tests will be part of POC - likely even before we finally ask for 
>>>>>>>>> votes
>>>>>>>>> there. So in short: * no modification or impact on current scheduler
>>>>>>>>> behaviour when DB Isolation is disabled * only higher level methods 
>>>>>>>>> will be
>>>>>>>>> moved out to DB-API and we will reuse existing "REST" APIS where it 
>>>>>>>>> makes
>>>>>>>>> sense * we aim to have "0" changes to the logic of processing - both 
>>>>>>>>> in Dag
>>>>>>>>> Processing logic and DB API. We think with this architecture we 
>>>>>>>>> proposed
>>>>>>>>> it's perfectly doable I hope this clarifies a bit, and once we agree 
>>>>>>>>> on
>>>>>>>>> general direction, we will definitely work on adding more details and
>>>>>>>>> clarification (we actually already have a lot of that but we just 
>>>>>>>>> wanted to
>>>>>>>>> start with explaining the idea and going into more details later when 
>>>>>>>>> we
>>>>>>>>> are sure there are no "high-level" blockers from the community. J, On 
>>>>>>>>> Thu,
>>>>>>>>> Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote:
>>>>>>>>> > > I just provided a general idea for the approach - but if you want 
>>>>>>>>> > > me to
>>>>>>>>> put more examples then I am happy to do that > > > Yes please. > > It 
>>>>>>>>> is
>>>>>>>>> too general for me and I can't work out what effect it would actually 
>>>>>>>>> have
>>>>>>>>> on the code base, especially how it would look with the config option 
>>>>>>>>> to
>>>>>>>>> enable/disable direct db access. > > -ash > > On Thu, Dec 2 2021 at
>>>>>>>>> 16:36:57 +0100, Mateusz Henc <mh...@google.com.INVALID> wrote: >
>>>>>>>>> > Hi, > I am sorry if it is not clear enough, let me try to explain it
>>>>>>>>> here, so maybe it gives more light on the idea. > See my comments 
>>>>>>>>> below > >
>>>>>>>>> On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org>
>>>>>>>>> wrote: >> >> I'm sorry to say it, but this proposal right just doesn't
>>>>>>>>> contain enough detail to say what the actual changes to the code 
>>>>>>>>> would be,
>>>>>>>>> and what the impact would be >> >> To take the one example you have 
>>>>>>>>> so far:
>>>>>>>>> >> >> >> def get_dag_run(self, dag_id, execution_date): >> return
>>>>>>>>> self.db_client.get_dag_run(dag_id,execution_date) >> >> So form this
>>>>>>>>> snippet I'm guessing it would be used like this: >> >> dag_run =
>>>>>>>>> db_client.get_dag_run(dag_id, execution_date) >> >> What type of 
>>>>>>>>> object is
>>>>>>>>> returned? > > > As it replaces: > dag_run = session.query(DagRun) >
>>>>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
>>>>>>>>> execution_date) >
>>>>>>>>> .first() > > then the type of the object will be exactly the same 
>>>>>>>>> (DagRun)
>>>>>>>>> . > >> >> >> Do we need one API method per individual query we have 
>>>>>>>>> in the
>>>>>>>>> source? > > > No, as explained by the sentence: > > The method may be
>>>>>>>>> extended, accepting more optional parameters to avoid having too many
>>>>>>>>> similar implementations. > > >> >> >> Which components would use this 
>>>>>>>>> new
>>>>>>>>> mode when it's enabled? > > > You may read: > Airflow Database APi is 
>>>>>>>>> a new
>>>>>>>>> independent component of Airflow. It allows isolating some components
>>>>>>>>> (Worker, DagProcessor and Triggerer) from direct access to DB. > >> 
>>>>>>>>> >> But
>>>>>>>>> what you haven't said the first thing about is what _other_ changes 
>>>>>>>>> would
>>>>>>>>> be needed in the code. To take a fairly simple example: >> >> dag_run 
>>>>>>>>> =
>>>>>>>>> db_client.get_dag_run(dag_id, execution_date) >> dag_run.queued_at =
>>>>>>>>> timezone.now() >> # How do I save this? >> >> In short, you need to 
>>>>>>>>> put a
>>>>>>>>> lot more detail into this before we can even have an idea of the full 
>>>>>>>>> scope
>>>>>>>>> of the change this proposal would involve, and what code changes 
>>>>>>>>> would be
>>>>>>>>> needed for compnents to work with and without this setting enabled. > 
>>>>>>>>> > >
>>>>>>>>> For this particular example - it depends on the intention of the code
>>>>>>>>> author > - If this should be in transaction - then I would actually
>>>>>>>>> introduce new method like enqueue_dag_run(...) that would run these 
>>>>>>>>> two
>>>>>>>>> steps on Airflow DB API side > - if not then, maybe just the
>>>>>>>>> "update_dag_run" method accepting the whole "dag_run" object and 
>>>>>>>>> saving it
>>>>>>>>> to the DB. > > In general - we could take naive approach, eg replace 
>>>>>>>>> code:
>>>>>>>>> > dag_run = session.query(DagRun) > .filter(DagRun.dag_id == dag_id,
>>>>>>>>> DagRun.execution_date == execution_date) > .first() > with: > if
>>>>>>>>> self.db_isolation: > dag_run = session.query(DagRun) >
>>>>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
>>>>>>>>> execution_date) >
>>>>>>>>> .first() > else: > dag_run = db_client.get_dag_run(self, dag_id,
>>>>>>>>> execution_date) > > The problem is that Airflow DB API would need to 
>>>>>>>>> have
>>>>>>>>> the same implementation for the query - so duplicated code. That's 
>>>>>>>>> why we
>>>>>>>>> propose moving this code to the DBClient which is also used by the 
>>>>>>>>> Airflow
>>>>>>>>> DB API(in DB direct mode). > > I know there are many places where the 
>>>>>>>>> code
>>>>>>>>> is much more complicated than a single query, but they must be handled
>>>>>>>>> one-by-one, during the implementation, otherwise this AIP would be 
>>>>>>>>> way too
>>>>>>>>> big. > > I just provided a general idea for the approach - but if you 
>>>>>>>>> want
>>>>>>>>> me to put more examples then I am happy to do that > > Best regards, >
>>>>>>>>> Mateusz Henc > >> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz 
>>>>>>>>> Henc
>>>>>>>>> <mh...@google.com.INVALID> wrote: >> >> Hi, >> I just added a new
>>>>>>>>> AIP for running some Airflow components in DB-isolation mode, without
>>>>>>>>> direct access to the Airflow Database, but they will use a new API 
>>>>>>>>> for thi
>>>>>>>>> purpose. >> >> PTAL: >>
>>>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API
>>>>>>>>> >> >> Open question: >> I called it "Airflow Database API" - however 
>>>>>>>>> >> >> I feel
>>>>>>>>> it could be more than just an access layer for the database. So if 
>>>>>>>>> you have
>>>>>>>>> a better name, please let me know, I am happy to change it. >> >> Best
>>>>>>>>> regards, >> Mateusz Henc
>>>>>>>>>
>>>>>>>>

Reply via email to