Hi Jarek,

Thanks for pushing forward with this AIP.

I am very interesting in this part:

All tests for DagProcessor, Workers (including Operators) and triggers are
executed in both modes "DBDirect'' and "DBIsolation ''. The code for
Airflow Database API, the DbApiClient and DBSession are extended until all
tests pass.

I would like to see more about how you do this because I have noticed that
there are tons of different configurations in the airflow and some
combinations are less covered by the test as there are some default values,
for example this code
<https://github.com/apache/airflow/blob/3c08cefdfd2e2636a714bb835902f0cb34225563/airflow/task/task_runner/standard_task_runner.py#L39-L42>
.

Also, it will be great if your AIP can discuss the long term strategy about
the internal API. Personally, I think we should completely remove the db
session access from the worker and other untrusted components and fully
rely on the Internal API. This will reduce the maintenance burden as if we
add new db access in those untrusted components, we need to do it for both
db session and internal API. What's more, with the Internal API, we can
even remove the db-proxy :)

Thanks,

Ping


On Tue, Aug 2, 2022 at 6:56 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> Hello,
>
> I have updated AIP-44 with the findings from the POC. If there are any
> more questions/comments, please let me know, otherwise I plan to start
> voting tomorrow.
>
>
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>
> On Mon, Jul 25, 2022 at 10:18 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> I think the drop will be less than those numbers - especially the
>> high-end ones. The DB operations we use are quite "coarse" grained (they
>> span a single transaction, not a function call). Also I deliberately chose
>> "example dags" as the benchmark playground - the example DAGs we have are
>> small. And in this case it means that the "actual" work done is rather
>> small. If you start parsing bigger DAGs, the overall number of calls (thus
>> RPC overhead) will not grow, but the time needed to parse the DAG will grow
>> a lot. This means that the RPC overhead will get way smaller
>> (percentage-wise).
>>
>> Also this is not an "overall" performance drop - this is really a
>> "mini-benchmark" (as opposed to "system benchmark" and "micro-benchmark").
>> The benchmark only  concerns running particular "transactions" - but all
>> the other parts remain unaffected - so the numbers I saw are the absolute,
>> maximum limits of the performance drop and they will be much smaller in
>> reality.
>>
>> J.
>>
>>
>> On Mon, Jul 25, 2022 at 9:01 PM Mateusz Henc <mh...@google.com.invalid>
>> wrote:
>>
>>> Hi,
>>> Sorry, but I was OOO and offline last week and I was not able to even
>>> rely.
>>>
>>> Thank you Jarek for a very detailed analysis. I think we all expected
>>> some performance drop for better security.
>>> Do you think the performance drop that you measured is what we should
>>> expect in the "final" version? I don't know Airflow that much to know which
>>> DB- access is used most (and what object types are required there -
>>> small/big or maybe just primitives?).
>>>
>>> Nonetheless, I am looking forward to starting the voting process!
>>>
>>> Best regards,
>>> Mateusz Henc
>>>
>>>
>>> On Fri, Jul 15, 2022 at 4:48 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>>> Two more things:
>>>> 1) there is one "caveat" I had to handle - timeout handling in
>>>> DagFileProcessor (signals do not play well with threads of GRPC - but that
>>>> should be easy to fix)
>>>> 2) The way I proposed it (with very localized changes needed)  will be
>>>> very easy to split the job among multiple people and make it a true
>>>> community effort to complete - no need for major refactorings or changes
>>>> across the whole airflow code.
>>>>
>>>> J.
>>>>
>>>> On Fri, Jul 15, 2022 at 4:32 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>>
>>>>> Hello Everyone,
>>>>>
>>>>> First of all - apologies for those who waited for it, I've been
>>>>> dragged in multiple directions, but finally I got some quality time to 
>>>>> take
>>>>> a look at open questions and implement POC for AIP-44 as promised before.
>>>>>
>>>>> TL;DR; I finally came back to the AIP-44 and multi-tenancy and have
>>>>> made good progress that hopefully will lead to voting next week. I think I
>>>>> have enough of the missing evidence of the impact of the internal API on
>>>>> performance, also I have implemented working POC with one of the Internal
>>>>> API calls (processFile) that we will have to implement and run a series of
>>>>> performance tests with it.
>>>>>
>>>>> # Current state
>>>>> --------------------
>>>>>
>>>>> The state we left it few months ago was (
>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>>> ):
>>>>> * I've prepared inventory of methods and general approach we are going
>>>>> to take (and we got consensus there)
>>>>> * I've left decision on the final choice (Thrift vs. GRPC) to later
>>>>> POC (I got some feedback from Beam about gRPC vs. Thrift, based on that I
>>>>> started with GRPC and I am actually very happy with it, so i think we can
>>>>> leave Thrift out of the picture).
>>>>> * We've struggled a bit with the decision - should we leave two paths
>>>>> (with GRPC/Direct DB) or one (local GRPC vs. remote GRPC)
>>>>>
>>>>> The POC is implemented here:
>>>>> https://github.com/apache/airflow/pull/25094 and I have the following
>>>>> findings:
>>>>>
>>>>> # Performance impact
>>>>> ------------------------------
>>>>>
>>>>> The performance impact is visible (as expected). It's quite acceptable
>>>>> for a distributed environment (in exchange for security), but it's likely
>>>>> significant enough to stay with the original idea of having Airflow work 
>>>>> in
>>>>> two modes: a) secure - with RPC apis, b) standard - with direct DB calls.
>>>>>
>>>>> On "localhost" with a local DB the performance overhead for
>>>>> serializing and transporting the messages between two different processes
>>>>> introduced up to 10% overhead. I saw an increase of time to run 500 scans
>>>>> of all our example folders dags going from ~290 to ~320s pretty
>>>>> consistently. Enough of a difference to exclude volatility. I tested it in
>>>>> Docker on both ARM and AMD. It seems that in some cases that can be
>>>>> partially  offset by slightly increased parallelism on multi-processing
>>>>> machines run on "bare" metal. I got consistently just 2-3% increase on my
>>>>> linux without docker with the same test harness, but we cannot rely on 
>>>>> such
>>>>> configurations, I think focusing on Docker-based installation without any
>>>>> special assumptions on how your networking/localhost is implemented is
>>>>> something we should treat as a baseline.
>>>>>
>>>>> The same experiment repeated with 50 messages but of much bigger size
>>>>> (300 Callbacks passed per message) have shown 30% performance drop. This 
>>>>> is
>>>>> significant, but I believe most of our messages will be much smaller. Also
>>>>> the messages I chose were very special - because in case of Callback we 
>>>>> are
>>>>> doing rather sophisticated serialization, because we have to account for
>>>>> Kubernetes objects that are potentially not serializable, so the test
>>>>> involved 300 messages that had to be not only GRPC serialized but also 
>>>>> some
>>>>> parts of them had to be Airflow-JSON serialized and the whole "executor
>>>>> config" had to be picked/unpickled (for 100 such messages out of 300). 
>>>>> Also
>>>>> we have some conditional processing there (there is a union of three types
>>>>> of callbacks that has to be properly deserialized). And those messages
>>>>> could be repeated. This is not happening for most other cases. And we can
>>>>> likely optimize it away in the future. but I wanted to see the "worst"
>>>>> scenario.
>>>>>
>>>>> For the remote client (still local DB for the internal API server) I
>>>>> got between 30% and 80% slow-down, but this was on my WiFi network. I am
>>>>> sure with a "wired" network, it will be much better, also when a remote DB
>>>>> gets into picture the overall percentage overhead will be much smaller. We
>>>>> knew this would be a "slower" solution but this is the price for someone
>>>>> who wants to have security and isolation.
>>>>>
>>>>> I have not yet performed the tests with SSL, but I think this will be
>>>>> a modest increase. And anyhow the "impact" of 10% is IMHO enough to make a
>>>>> decision that we cannot get "GRPC-only" - the path where we will continue
>>>>> using Direct DB access should stay (and I know already how to do it
>>>>> relatively easily).
>>>>>
>>>>> # Implementation and maintainability
>>>>> -------------------------------------------------
>>>>>
>>>>> In the PR you will see the way I see  implementation details and how
>>>>> it will impact our code in general.  I think what I proposed is actually
>>>>> rather elegant and easy to maintain, and likely we can improve it somehow
>>>>> (happy to brainstorm on some creative ways we could use - for example -
>>>>> decorators) to make it "friendler" but I focused more on explicitness and
>>>>> showing the mechanisms involved rather than "fanciness". I found it rather
>>>>> easy to implement and it does seem to have some good "easy 
>>>>> maintainability"
>>>>> properties. The way I propose it boils down to few "rules":
>>>>>
>>>>> * For all the data and "Messages" we sent, we have a nice "Internal
>>>>> API" defined in .proto and protobuf objects generated out of that. The 
>>>>> GRPC
>>>>> proto nicely defines the structures we are going to send over the network.
>>>>> It's very standard, it has a really good modern support. For one, we
>>>>> automatically - I added pre-commit - generate MyPY type stubs for the
>>>>> generated classes and it makes it super easy to both - implement the
>>>>> mapping and automatically verify its correctness. Mypy nicely catches all
>>>>> kinds of mistakes you can make)! Autocomplete for the PROTO-generated
>>>>> classes works like a charm. I struggled initially without typing, but once
>>>>> I configured mypy stub generation, I got really nice detection of mistakes
>>>>> I've made and I was able to progress with the implementation way faster
>>>>> than without it. Usually everything magically worked as soon as I fixed 
>>>>> all
>>>>> MyPy errors.
>>>>>
>>>>> * All Airflow objects that we get to send over GRPC should get
>>>>> from_protobuf/to_protobuf methods. They are usually very simple (just
>>>>> passing strings/ints/boolean fields to the constructor), and the 
>>>>> structures
>>>>> we pass are rather small (see the PR). Also (as mentioned above) I
>>>>> implemented a few more complex cases (with more complex serialization) and
>>>>> it is easy, readable and pretty well integrated into our code IMHO. This
>>>>> introduces a little duplication here and there, but those objects change
>>>>> rarely (only when we implement big features like Dynamic Task mapping) and
>>>>> MyPy guards us against any mishaps there (now that our code is mostly
>>>>> typed).
>>>>>
>>>>> * We make all "DB Aware" objects also "GRPC aware". The number of
>>>>> changes in the actual code/logic is rather small and makes it super easy 
>>>>> to
>>>>> maintain IMHO. There are two changes:
>>>>>
>>>>>     1) for any of the objects that are used for database operations
>>>>> (in my PR this is DagFileProcessor) we need to initialize it with the
>>>>> "use_grpc" flag and pass it a channel that will be used for communication.
>>>>>     2) the DB methods we have (inventory in the AIP) will have to be
>>>>> refactored slightly. This is what really was added, the original
>>>>> "process_file" method was renamed to "process_file_db" and the "callers" 
>>>>> of
>>>>> the method are completely intact.
>>>>>
>>>>>     def process_file_grpc(
>>>>>         self,
>>>>>         file_path: str,
>>>>>         callback_requests: List[CallbackRequest],
>>>>>         pickle_dags: bool = False,
>>>>>     ) -> Tuple[int, int]:
>>>>>         request =
>>>>> internal_api_pb2.FileProcessorRequest(path=file_path,
>>>>> pickle_dags=pickle_dags)
>>>>>         for callback_request in callback_requests:
>>>>>             request.callbacks.append(callback_request.to_protobuf())
>>>>>         res = self.stub.processFile(request)
>>>>>         return res.dagsFound, res.errorsFound
>>>>>
>>>>>     def process_file(
>>>>>         self,
>>>>>         file_path: str,
>>>>>         callback_requests: List[CallbackRequest],
>>>>>         pickle_dags: bool = False,
>>>>>     ) -> Tuple[int, int]:
>>>>>         if self.use_grpc:
>>>>>             return self.process_file_grpc(
>>>>>                 file_path=file_path,
>>>>> callback_requests=callback_requests, pickle_dags=pickle_dags
>>>>>             )
>>>>>         return self.process_file_db(
>>>>>             file_path=file_path, callback_requests=callback_requests,
>>>>> pickle_dags=pickle_dags
>>>>>         )
>>>>>
>>>>> You can see all the details in the PR
>>>>> https://github.com/apache/airflow/pull/25094.
>>>>>
>>>>> # POC testing harness (self-service :) )
>>>>> ----------------------------------------------------
>>>>>
>>>>> You can also very easily test it yourself:
>>>>>
>>>>> * airflow internal-api server -> runs internal API server.
>>>>> * airflow internal-api test-client --num-repeats 10 --num-callbacks 10
>>>>> --use-grpc -> runs file-processing of all example dags 10 times with 10x3
>>>>> callbacks sent. Same command without --use-grpc will run using direct DB
>>>>> access. Server listens on "50051" port, client connects to
>>>>> "localhost:50051" - you can modify the code to use remote IP/different
>>>>> ports (easy to find by localhost:50051).
>>>>>
>>>>> # Discussion and voting
>>>>> --------------------------------
>>>>>
>>>>> Of course some decisions in the PR can be improved (I focused on
>>>>> explicitness of the POC more than anything else). We can discuss some
>>>>> changes and improvements to some decisions I made when the PR will be in
>>>>> "reviewable state" and I am happy to improve it, but for now I would like
>>>>> to focus on answering the two questions:
>>>>>
>>>>> * Does it look plausible?
>>>>> * Does it look like it's almost ready to vote on it? (I will update
>>>>> the AIP before starting voting, of course).
>>>>>
>>>>> Let me know what you think.
>>>>>
>>>>> J.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 1, 2022 at 3:11 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>>>
>>>>>> Since we have AIP-43 already approved, I think I would love to have
>>>>>> more questions and discussions about AIP-44 - The "Airflow Internal API"
>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>>>> (as the new name is).
>>>>>>
>>>>>> For those who would like to get more context - recording of the
>>>>>> meeting where both AIP-43 and AIP-44 scope and proposals were discussed 
>>>>>> can
>>>>>> be found here:
>>>>>> https://drive.google.com/file/d/1SMFzazuY1kg4B4r11wNt8EQ_PmTDRKq6/view
>>>>>>
>>>>>> In the AIP I just made a small clarification regarding some "future"
>>>>>> changes  - specifically the token security might be nicely handled 
>>>>>> together
>>>>>> with AIP-46 "Add support for docker runtime isolation" that is proposed 
>>>>>> by
>>>>>> Ping.
>>>>>>
>>>>>> My goal is to gather comments till the end of the week, and if there
>>>>>> will be no big concerns, I would love to start voting next week.
>>>>>>
>>>>>> J.
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 3, 2022 at 2:48 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>>>>
>>>>>>> Also AIP-44 - which is the DB isolation mode is much more detailed
>>>>>>> and
>>>>>>> ready for deeper discussion if needed.
>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API
>>>>>>>
>>>>>>> On Tue, Dec 14, 2021 at 12:07 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > And just to add to that.
>>>>>>> >
>>>>>>> > Thanks again for the initial comments and pushing us to provide
>>>>>>> more
>>>>>>> > details. That allowed us to discuss and focus on many of the
>>>>>>> aspects
>>>>>>> > that were raised and we have many more answers now:
>>>>>>> >
>>>>>>> > * first of all - we focused on making sure impact on the existing
>>>>>>> code
>>>>>>> > and "behavior" of Airflow is minimal. In fact there should be
>>>>>>> > virtually no change vs. current behavior when db isolation is
>>>>>>> > disabled.
>>>>>>> > * secondly - we've done a full inventory of how the API needed
>>>>>>> should
>>>>>>> > look like and (not unsurprisingly) it turned out that the current
>>>>>>> > REST-style API is good for part of it but most of the 'logic" of
>>>>>>> > airflow can be done efficiently when we go to RPC-style API.
>>>>>>> However
>>>>>>> > we propose that the authorization/exposure of the API is the same
>>>>>>> as
>>>>>>> > we use currently in the REST API, this will allow us to reuse a big
>>>>>>> > part of the infrastructure we already have
>>>>>>> > * thirdly - we took deep into our hearts the comments about having
>>>>>>> to
>>>>>>> > maintain pretty much the same logic in a few different places.
>>>>>>> That's
>>>>>>> > an obvious maintenance problem. The proposal we came up with
>>>>>>> addresses
>>>>>>> > it - we are going to keep the logic of Airflow internals in one
>>>>>>> place
>>>>>>> > only and we will simply smartly route on where the logic will be
>>>>>>> > executed
>>>>>>> > * regarding the performance impact - we described the deployment
>>>>>>> > options that our proposal makes available - we do not want to favor
>>>>>>> > one deployment option over another, but we made sure the
>>>>>>> architecture
>>>>>>> > is done in the way, that you can choose which deployment is good
>>>>>>> for
>>>>>>> > you: "no isolation", "partial isolation", "full isolation" - each
>>>>>>> with
>>>>>>> > different performance/resource characteristics - all of them fully
>>>>>>> > horizontally scalable and nicely manageable.
>>>>>>> >
>>>>>>> > We look forward to comments, also for the API 43 -
>>>>>>> >
>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
>>>>>>> > - AIP-43 is a prerequiste to AIP-44 and they both work together.
>>>>>>> >
>>>>>>> > We also will think and discuss more follow-up AIPs once we get
>>>>>>> those
>>>>>>> > approved (hopefully ;) ). The multi-tenancy is a 'long haul" and
>>>>>>> while
>>>>>>> > those two AIPs are foundational building blocks, there are at least
>>>>>>> > few more follow-up AIPs to be able to say "we're done with
>>>>>>> > Multi-tenancy" :).
>>>>>>> >
>>>>>>> > J.
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On Tue, Dec 14, 2021 at 9:58 AM Mateusz Henc
>>>>>>> <mh...@google.com.invalid> wrote:
>>>>>>> > >
>>>>>>> > > Hi,
>>>>>>> > >
>>>>>>> > > As promised we (credits to Jarek) updated the AIPs, added more
>>>>>>> details, did inventory and changed the way API endpoints are generated.
>>>>>>> > >
>>>>>>> > > We also renamed it to Airflow Internal API - so the url has
>>>>>>> changed:
>>>>>>> > >
>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>>>>> > >
>>>>>>> > > Please take a look, any comments are highly appreciated.
>>>>>>> > >
>>>>>>> > > Best regards,
>>>>>>> > > Mateusz Henc
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > On Mon, Dec 6, 2021 at 3:09 PM Mateusz Henc <mh...@google.com>
>>>>>>> wrote:
>>>>>>> > >>
>>>>>>> > >> Hi,
>>>>>>> > >> Thank you Ash for your feedback (in both AIPs)
>>>>>>> > >>
>>>>>>> > >> We are working to address your concerns. We will update the
>>>>>>> AIPs in a few days.
>>>>>>> > >> I will let you know when it's done.
>>>>>>> > >>
>>>>>>> > >> Best regards,
>>>>>>> > >> Mateusz Henc
>>>>>>> > >>
>>>>>>> > >>
>>>>>>> > >> On Fri, Dec 3, 2021 at 7:27 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>>> wrote:
>>>>>>> > >>>
>>>>>>> > >>> Cool. Thanks for the guidance.
>>>>>>> > >>>
>>>>>>> > >>> On Fri, Dec 3, 2021 at 6:37 PM Ash Berlin-Taylor <
>>>>>>> a...@apache.org> wrote:
>>>>>>> > >>> >
>>>>>>> > >>> > - make an inventory: It doesn't need to be exhaustive, but a
>>>>>>> representative sample.
>>>>>>> > >>> > - More clearly define _what_ the API calls return -- object
>>>>>>> type, methods on them etc.
>>>>>>> > >>> >
>>>>>>> > >>> > From the AIP you have this example:
>>>>>>> > >>> >
>>>>>>> > >>> >   def get_dag_run(self, dag_id, execution_date):
>>>>>>> > >>> >     return self.db_client.get_dag_run(dag_id,execution_date)
>>>>>>> > >>> >
>>>>>>> > >>> > What does that _actually_ return? What capabilities does it
>>>>>>> have?
>>>>>>> > >>> >
>>>>>>> > >>> > (I have other thoughts but those are less fundamental and
>>>>>>> can be discussed later)
>>>>>>> > >>> >
>>>>>>> > >>> > -ash
>>>>>>> > >>> >
>>>>>>> > >>> > On Fri, Dec 3 2021 at 18:20:21 +0100, Jarek Potiuk <
>>>>>>> ja...@potiuk.com> wrote:
>>>>>>> > >>> >
>>>>>>> > >>> > Surely - if you think that we need to do some more work to
>>>>>>> get confidence, that's fine. I am sure we can improve it to the level 
>>>>>>> that
>>>>>>> we will not have to do full performance tests, and you are confident in 
>>>>>>> the
>>>>>>> direction. Just to clarify your concerns and make sure we are on the 
>>>>>>> same
>>>>>>> page - as I understand it should: * make an inventory of "actual 
>>>>>>> changes"
>>>>>>> the proposal will involve in the database-low-level code of Airflow * 
>>>>>>> based
>>>>>>> on that either assessment that those changes are unlikely (or likely 
>>>>>>> make a
>>>>>>> performance impact) * if we asses that it is likely to have an impact, 
>>>>>>> some
>>>>>>> at least rudimentary performance tests to prove that this is manageable 
>>>>>>> I
>>>>>>> think that might be a good exercise to do. Does it sound about right? 
>>>>>>> Or do
>>>>>>> you have any concerns about certain architectural decisions taken? No
>>>>>>> problem with Friday, but if we get answers today. I think it will give 
>>>>>>> us
>>>>>>> time to think about it over the weekend and address it next week. J. On
>>>>>>> Fri, Dec 3, 2021 at 5:56 PM Ash Berlin-Taylor <a...@apache.org>
>>>>>>> wrote:
>>>>>>> > >>> >
>>>>>>> > >>> > This is a fundamental change to the architecture with
>>>>>>> significant possible impacts on performance, and likely requires 
>>>>>>> touching a
>>>>>>> large portion of the code base. Sorry, you're going to have to do 
>>>>>>> expand on
>>>>>>> the details first and work out what would actually be involved and what 
>>>>>>> the
>>>>>>> impacts will be: Right now I have serious reservations to this 
>>>>>>> approach, so
>>>>>>> I can't agree on the high level proposal without an actual proposal (The
>>>>>>> current document is, at best, an outline, not an actual proposal.) 
>>>>>>> Sorry to
>>>>>>> be a grinch right before the weekend. Ash On Thu, Dec 2 2021 at 22:47:34
>>>>>>> +0100, Jarek Potiuk <ja...@potiuk.com> wrote: Oh yeah - good point
>>>>>>> and we spoke about performance testing/implications. Performance is
>>>>>>> something we were discussing as the next step when we get general "OK" 
>>>>>>> in
>>>>>>> the direction - we just want to make sure that there are no "huge" 
>>>>>>> blockers
>>>>>>> in the way this is proposed and explain any doubts first, so that the
>>>>>>> investment in performance part makes sense. We do not want to spend a 
>>>>>>> lot
>>>>>>> of time on getting the tests done and detailed inventory of methods/ API
>>>>>>> calls to get - only to find out that this is generally "bad direction".
>>>>>>> Just to clarify again - we also considered (alternative option) to
>>>>>>> automatically map all the DB methods in the remote calls. But we dropped
>>>>>>> that idea - precisely for the reason of performance, and transaction
>>>>>>> integrity. So we are NOT mapping DB calls into API calls. those will be
>>>>>>> "logical operations" on the database. Generally speaking, most of the 
>>>>>>> API
>>>>>>> calls for the "airflow system-level but executed in worker" calls will 
>>>>>>> be
>>>>>>> rather "coarse" than fine-grained. For example, the aforementioned "mini
>>>>>>> scheduler" - where we want to make a single API call and run the whole 
>>>>>>> of
>>>>>>> it on the DBAPI side. So there - performance impact is very limited 
>>>>>>> IMHO.
>>>>>>> And If we see any other "logic" like that in other parts of the code
>>>>>>> (zombie detection as an example). We plan to make a detailed inventory 
>>>>>>> of
>>>>>>> those once we get general "Looks good" for the direction. For now we did
>>>>>>> some "rough" checking and it seems a plausible approach and quite 
>>>>>>> doable.
>>>>>>> One more note - the "fine-grained" ( "variable" update/retrieval,
>>>>>>> "connection update retrieval") - via REST API will still be used by the
>>>>>>> user's code though (Parsing DAGs, operators, workers and callbacks). We
>>>>>>> also plan to make sure that none of the "Community" operators are using
>>>>>>> "non-blessed" DB calls (we can do it in our CI). So at the end of the
>>>>>>> exercise, all operators, hooks, etc. from the community will be 
>>>>>>> guaranteed
>>>>>>> to only use the DB APIs that are available in the "DB API" module. But
>>>>>>> there I do not expect pretty much any performance penalty as those are 
>>>>>>> very
>>>>>>> fast and rare operations (and good thing there is that we can cache 
>>>>>>> results
>>>>>>> of those in workers/DAG processing). J. On Thu, Dec 2, 2021 at 7:16 PM
>>>>>>> Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: Ah, my
>>>>>>> bad, I missed that. I'd still like to see discussion of the performance
>>>>>>> impacts, though. On Thu, Dec 2, 2021 at 11:14 AM Ash Berlin-Taylor <
>>>>>>> a...@apache.org> wrote: The scheduler was excluded from the
>>>>>>> components that would use the dbapi - the mini scheduler is the odd one 
>>>>>>> out
>>>>>>> here is it (currently) runs on the work but shares much of the code from
>>>>>>> the scheduling path. -a On 2 December 2021 17:56:40 GMT, Andrew Godwin <
>>>>>>> andrew.god...@astronomer.io.INVALID> wrote: I would also like to
>>>>>>> see some discussion in this AIP about how the data is going to be
>>>>>>> serialised to and from the database instances (obviously Connexion is
>>>>>>> involved, but I presume more transformation code is needed than that) 
>>>>>>> and
>>>>>>> the potential slowdown this would cause. In my experience, a somewhat
>>>>>>> direct ORM mapping like this is going to result in considerably slower
>>>>>>> times for any complex operation that's touching a few hundred rows. Is
>>>>>>> there a reason this is being proposed for the scheduler code, too? In my
>>>>>>> mind, the best approach to multitenancy would be to remove all
>>>>>>> user-supplied code from the scheduler and leave it with direct DB 
>>>>>>> access,
>>>>>>> rather than trying to indirect all scheduler access through another API
>>>>>>> layer. Andrew On Thu, Dec 2, 2021 at 10:29 AM Jarek Potiuk <
>>>>>>> ja...@potiuk.com> wrote: Yeah - I thik Ash you are completely right
>>>>>>> we need some more "detailed" clarification. I believe, I know what you 
>>>>>>> are
>>>>>>> - rightfully - afraid of (re impact on the code), and maybe we have not
>>>>>>> done a good job on explaining it with some of our assumptions we had 
>>>>>>> when
>>>>>>> we worked on it with Mateusz. Simply it was not clear that our aim is to
>>>>>>> absolutely minimise the impact on the "internal DB transactions" done in
>>>>>>> schedulers and workers. The idea is that change will at most result in
>>>>>>> moving an execution of the transactions to another process but not 
>>>>>>> changing
>>>>>>> what the DB transactions do internally. Actually this was one of the 
>>>>>>> reason
>>>>>>> for the "alternative" approach (you can see it in the document) we
>>>>>>> discussed about - hijack "sqlalchemy session" - this is far too low 
>>>>>>> level
>>>>>>> and the aim of the "DB-API" is NOT to replace direct DB calls (Hence we
>>>>>>> need to figure out a better name). The API is there to provide 
>>>>>>> "scheduler
>>>>>>> logic" API and "REST access to Airflow primitives like
>>>>>>> dags/tasks/variables/connections" etc.. As an example (which we briefly
>>>>>>> talked about in slack) the "_run_mini_scheduler_on_child_tasks" case (
>>>>>>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274)
>>>>>>> is an example (that we would put in the doc). As we thought of it - 
>>>>>>> this is
>>>>>>> a "single DB-API operation". Those are not Pure REST calls of course, 
>>>>>>> they
>>>>>>> are more RPC-like calls. That is why even initially I thought of 
>>>>>>> separating
>>>>>>> the API completely. But since there are a lot of common "primitive" 
>>>>>>> calls
>>>>>>> that we can re-use, I think having a separate DB-API component which 
>>>>>>> will
>>>>>>> re-use connexion implementation, replacing authentication with the 
>>>>>>> custom
>>>>>>> worker <> DB-API authentication is the way to go. And yes if we agree on
>>>>>>> the general idea, we need to choose the best way on how to best 
>>>>>>> "connect"
>>>>>>> the REST API we have with the RPC-kind of API we need for some cases in
>>>>>>> workers. But we wanted to make sure we are on the same page with the
>>>>>>> direction. And yes it means that DB-API will potentially have to handle
>>>>>>> quite a number of DB operations (and that it has to be replicable and
>>>>>>> scalable as well) - but DB-API will be "stateless" similarly as the
>>>>>>> webserver is, so it will be scalable by definition. And yest performance
>>>>>>> tests will be part of POC - likely even before we finally ask for votes
>>>>>>> there. So in short: * no modification or impact on current scheduler
>>>>>>> behaviour when DB Isolation is disabled * only higher level methods 
>>>>>>> will be
>>>>>>> moved out to DB-API and we will reuse existing "REST" APIS where it 
>>>>>>> makes
>>>>>>> sense * we aim to have "0" changes to the logic of processing - both in 
>>>>>>> Dag
>>>>>>> Processing logic and DB API. We think with this architecture we proposed
>>>>>>> it's perfectly doable I hope this clarifies a bit, and once we agree on
>>>>>>> general direction, we will definitely work on adding more details and
>>>>>>> clarification (we actually already have a lot of that but we just 
>>>>>>> wanted to
>>>>>>> start with explaining the idea and going into more details later when we
>>>>>>> are sure there are no "high-level" blockers from the community. J, On 
>>>>>>> Thu,
>>>>>>> Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote: >
>>>>>>> > I just provided a general idea for the approach - but if you want me 
>>>>>>> > to
>>>>>>> put more examples then I am happy to do that > > > Yes please. > > It is
>>>>>>> too general for me and I can't work out what effect it would actually 
>>>>>>> have
>>>>>>> on the code base, especially how it would look with the config option to
>>>>>>> enable/disable direct db access. > > -ash > > On Thu, Dec 2 2021 at
>>>>>>> 16:36:57 +0100, Mateusz Henc <mh...@google.com.INVALID> wrote: > >
>>>>>>> Hi, > I am sorry if it is not clear enough, let me try to explain it 
>>>>>>> here,
>>>>>>> so maybe it gives more light on the idea. > See my comments below > > On
>>>>>>> Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org>
>>>>>>> wrote: >> >> I'm sorry to say it, but this proposal right just doesn't
>>>>>>> contain enough detail to say what the actual changes to the code would 
>>>>>>> be,
>>>>>>> and what the impact would be >> >> To take the one example you have so 
>>>>>>> far:
>>>>>>> >> >> >> def get_dag_run(self, dag_id, execution_date): >> return
>>>>>>> self.db_client.get_dag_run(dag_id,execution_date) >> >> So form this
>>>>>>> snippet I'm guessing it would be used like this: >> >> dag_run =
>>>>>>> db_client.get_dag_run(dag_id, execution_date) >> >> What type of object 
>>>>>>> is
>>>>>>> returned? > > > As it replaces: > dag_run = session.query(DagRun) >
>>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
>>>>>>> execution_date) >
>>>>>>> .first() > > then the type of the object will be exactly the same 
>>>>>>> (DagRun)
>>>>>>> . > >> >> >> Do we need one API method per individual query we have in 
>>>>>>> the
>>>>>>> source? > > > No, as explained by the sentence: > > The method may be
>>>>>>> extended, accepting more optional parameters to avoid having too many
>>>>>>> similar implementations. > > >> >> >> Which components would use this 
>>>>>>> new
>>>>>>> mode when it's enabled? > > > You may read: > Airflow Database APi is a 
>>>>>>> new
>>>>>>> independent component of Airflow. It allows isolating some components
>>>>>>> (Worker, DagProcessor and Triggerer) from direct access to DB. > >> >> 
>>>>>>> But
>>>>>>> what you haven't said the first thing about is what _other_ changes 
>>>>>>> would
>>>>>>> be needed in the code. To take a fairly simple example: >> >> dag_run =
>>>>>>> db_client.get_dag_run(dag_id, execution_date) >> dag_run.queued_at =
>>>>>>> timezone.now() >> # How do I save this? >> >> In short, you need to put 
>>>>>>> a
>>>>>>> lot more detail into this before we can even have an idea of the full 
>>>>>>> scope
>>>>>>> of the change this proposal would involve, and what code changes would 
>>>>>>> be
>>>>>>> needed for compnents to work with and without this setting enabled. > > 
>>>>>>> >
>>>>>>> For this particular example - it depends on the intention of the code
>>>>>>> author > - If this should be in transaction - then I would actually
>>>>>>> introduce new method like enqueue_dag_run(...) that would run these two
>>>>>>> steps on Airflow DB API side > - if not then, maybe just the
>>>>>>> "update_dag_run" method accepting the whole "dag_run" object and saving 
>>>>>>> it
>>>>>>> to the DB. > > In general - we could take naive approach, eg replace 
>>>>>>> code:
>>>>>>> > dag_run = session.query(DagRun) > .filter(DagRun.dag_id == dag_id,
>>>>>>> DagRun.execution_date == execution_date) > .first() > with: > if
>>>>>>> self.db_isolation: > dag_run = session.query(DagRun) >
>>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
>>>>>>> execution_date) >
>>>>>>> .first() > else: > dag_run = db_client.get_dag_run(self, dag_id,
>>>>>>> execution_date) > > The problem is that Airflow DB API would need to 
>>>>>>> have
>>>>>>> the same implementation for the query - so duplicated code. That's why 
>>>>>>> we
>>>>>>> propose moving this code to the DBClient which is also used by the 
>>>>>>> Airflow
>>>>>>> DB API(in DB direct mode). > > I know there are many places where the 
>>>>>>> code
>>>>>>> is much more complicated than a single query, but they must be handled
>>>>>>> one-by-one, during the implementation, otherwise this AIP would be way 
>>>>>>> too
>>>>>>> big. > > I just provided a general idea for the approach - but if you 
>>>>>>> want
>>>>>>> me to put more examples then I am happy to do that > > Best regards, >
>>>>>>> Mateusz Henc > >> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc
>>>>>>> <mh...@google.com.INVALID> wrote: >> >> Hi, >> I just added a new
>>>>>>> AIP for running some Airflow components in DB-isolation mode, without
>>>>>>> direct access to the Airflow Database, but they will use a new API for 
>>>>>>> thi
>>>>>>> purpose. >> >> PTAL: >>
>>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API
>>>>>>> >> >> Open question: >> I called it "Airflow Database API" - however I 
>>>>>>> >> >> feel
>>>>>>> it could be more than just an access layer for the database. So if you 
>>>>>>> have
>>>>>>> a better name, please let me know, I am happy to change it. >> >> Best
>>>>>>> regards, >> Mateusz Henc
>>>>>>>
>>>>>>

Reply via email to