Hello,

I have updated AIP-44 with the findings from the POC. If there are any more
questions/comments, please let me know, otherwise I plan to start voting
tomorrow.


https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API

On Mon, Jul 25, 2022 at 10:18 PM Jarek Potiuk <ja...@potiuk.com> wrote:

> I think the drop will be less than those numbers - especially the high-end
> ones. The DB operations we use are quite "coarse" grained (they span a
> single transaction, not a function call). Also I deliberately chose
> "example dags" as the benchmark playground - the example DAGs we have are
> small. And in this case it means that the "actual" work done is rather
> small. If you start parsing bigger DAGs, the overall number of calls (thus
> RPC overhead) will not grow, but the time needed to parse the DAG will grow
> a lot. This means that the RPC overhead will get way smaller
> (percentage-wise).
>
> Also this is not an "overall" performance drop - this is really a
> "mini-benchmark" (as opposed to "system benchmark" and "micro-benchmark").
> The benchmark only  concerns running particular "transactions" - but all
> the other parts remain unaffected - so the numbers I saw are the absolute,
> maximum limits of the performance drop and they will be much smaller in
> reality.
>
> J.
>
>
> On Mon, Jul 25, 2022 at 9:01 PM Mateusz Henc <mh...@google.com.invalid>
> wrote:
>
>> Hi,
>> Sorry, but I was OOO and offline last week and I was not able to even
>> rely.
>>
>> Thank you Jarek for a very detailed analysis. I think we all expected
>> some performance drop for better security.
>> Do you think the performance drop that you measured is what we should
>> expect in the "final" version? I don't know Airflow that much to know which
>> DB- access is used most (and what object types are required there -
>> small/big or maybe just primitives?).
>>
>> Nonetheless, I am looking forward to starting the voting process!
>>
>> Best regards,
>> Mateusz Henc
>>
>>
>> On Fri, Jul 15, 2022 at 4:48 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> Two more things:
>>> 1) there is one "caveat" I had to handle - timeout handling in
>>> DagFileProcessor (signals do not play well with threads of GRPC - but that
>>> should be easy to fix)
>>> 2) The way I proposed it (with very localized changes needed)  will be
>>> very easy to split the job among multiple people and make it a true
>>> community effort to complete - no need for major refactorings or changes
>>> across the whole airflow code.
>>>
>>> J.
>>>
>>> On Fri, Jul 15, 2022 at 4:32 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>>> Hello Everyone,
>>>>
>>>> First of all - apologies for those who waited for it, I've been dragged
>>>> in multiple directions, but finally I got some quality time to take a look
>>>> at open questions and implement POC for AIP-44 as promised before.
>>>>
>>>> TL;DR; I finally came back to the AIP-44 and multi-tenancy and have
>>>> made good progress that hopefully will lead to voting next week. I think I
>>>> have enough of the missing evidence of the impact of the internal API on
>>>> performance, also I have implemented working POC with one of the Internal
>>>> API calls (processFile) that we will have to implement and run a series of
>>>> performance tests with it.
>>>>
>>>> # Current state
>>>> --------------------
>>>>
>>>> The state we left it few months ago was (
>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>> ):
>>>> * I've prepared inventory of methods and general approach we are going
>>>> to take (and we got consensus there)
>>>> * I've left decision on the final choice (Thrift vs. GRPC) to later POC
>>>> (I got some feedback from Beam about gRPC vs. Thrift, based on that I
>>>> started with GRPC and I am actually very happy with it, so i think we can
>>>> leave Thrift out of the picture).
>>>> * We've struggled a bit with the decision - should we leave two paths
>>>> (with GRPC/Direct DB) or one (local GRPC vs. remote GRPC)
>>>>
>>>> The POC is implemented here:
>>>> https://github.com/apache/airflow/pull/25094 and I have the following
>>>> findings:
>>>>
>>>> # Performance impact
>>>> ------------------------------
>>>>
>>>> The performance impact is visible (as expected). It's quite acceptable
>>>> for a distributed environment (in exchange for security), but it's likely
>>>> significant enough to stay with the original idea of having Airflow work in
>>>> two modes: a) secure - with RPC apis, b) standard - with direct DB calls.
>>>>
>>>> On "localhost" with a local DB the performance overhead for serializing
>>>> and transporting the messages between two different processes introduced up
>>>> to 10% overhead. I saw an increase of time to run 500 scans of all our
>>>> example folders dags going from ~290 to ~320s pretty consistently. Enough
>>>> of a difference to exclude volatility. I tested it in Docker on both ARM
>>>> and AMD. It seems that in some cases that can be partially  offset by
>>>> slightly increased parallelism on multi-processing machines run on "bare"
>>>> metal. I got consistently just 2-3% increase on my linux without docker
>>>> with the same test harness, but we cannot rely on such configurations, I
>>>> think focusing on Docker-based installation without any special assumptions
>>>> on how your networking/localhost is implemented is something we should
>>>> treat as a baseline.
>>>>
>>>> The same experiment repeated with 50 messages but of much bigger size
>>>> (300 Callbacks passed per message) have shown 30% performance drop. This is
>>>> significant, but I believe most of our messages will be much smaller. Also
>>>> the messages I chose were very special - because in case of Callback we are
>>>> doing rather sophisticated serialization, because we have to account for
>>>> Kubernetes objects that are potentially not serializable, so the test
>>>> involved 300 messages that had to be not only GRPC serialized but also some
>>>> parts of them had to be Airflow-JSON serialized and the whole "executor
>>>> config" had to be picked/unpickled (for 100 such messages out of 300). Also
>>>> we have some conditional processing there (there is a union of three types
>>>> of callbacks that has to be properly deserialized). And those messages
>>>> could be repeated. This is not happening for most other cases. And we can
>>>> likely optimize it away in the future. but I wanted to see the "worst"
>>>> scenario.
>>>>
>>>> For the remote client (still local DB for the internal API server) I
>>>> got between 30% and 80% slow-down, but this was on my WiFi network. I am
>>>> sure with a "wired" network, it will be much better, also when a remote DB
>>>> gets into picture the overall percentage overhead will be much smaller. We
>>>> knew this would be a "slower" solution but this is the price for someone
>>>> who wants to have security and isolation.
>>>>
>>>> I have not yet performed the tests with SSL, but I think this will be a
>>>> modest increase. And anyhow the "impact" of 10% is IMHO enough to make a
>>>> decision that we cannot get "GRPC-only" - the path where we will continue
>>>> using Direct DB access should stay (and I know already how to do it
>>>> relatively easily).
>>>>
>>>> # Implementation and maintainability
>>>> -------------------------------------------------
>>>>
>>>> In the PR you will see the way I see  implementation details and how it
>>>> will impact our code in general.  I think what I proposed is actually
>>>> rather elegant and easy to maintain, and likely we can improve it somehow
>>>> (happy to brainstorm on some creative ways we could use - for example -
>>>> decorators) to make it "friendler" but I focused more on explicitness and
>>>> showing the mechanisms involved rather than "fanciness". I found it rather
>>>> easy to implement and it does seem to have some good "easy maintainability"
>>>> properties. The way I propose it boils down to few "rules":
>>>>
>>>> * For all the data and "Messages" we sent, we have a nice "Internal
>>>> API" defined in .proto and protobuf objects generated out of that. The GRPC
>>>> proto nicely defines the structures we are going to send over the network.
>>>> It's very standard, it has a really good modern support. For one, we
>>>> automatically - I added pre-commit - generate MyPY type stubs for the
>>>> generated classes and it makes it super easy to both - implement the
>>>> mapping and automatically verify its correctness. Mypy nicely catches all
>>>> kinds of mistakes you can make)! Autocomplete for the PROTO-generated
>>>> classes works like a charm. I struggled initially without typing, but once
>>>> I configured mypy stub generation, I got really nice detection of mistakes
>>>> I've made and I was able to progress with the implementation way faster
>>>> than without it. Usually everything magically worked as soon as I fixed all
>>>> MyPy errors.
>>>>
>>>> * All Airflow objects that we get to send over GRPC should get
>>>> from_protobuf/to_protobuf methods. They are usually very simple (just
>>>> passing strings/ints/boolean fields to the constructor), and the structures
>>>> we pass are rather small (see the PR). Also (as mentioned above) I
>>>> implemented a few more complex cases (with more complex serialization) and
>>>> it is easy, readable and pretty well integrated into our code IMHO. This
>>>> introduces a little duplication here and there, but those objects change
>>>> rarely (only when we implement big features like Dynamic Task mapping) and
>>>> MyPy guards us against any mishaps there (now that our code is mostly
>>>> typed).
>>>>
>>>> * We make all "DB Aware" objects also "GRPC aware". The number of
>>>> changes in the actual code/logic is rather small and makes it super easy to
>>>> maintain IMHO. There are two changes:
>>>>
>>>>     1) for any of the objects that are used for database operations (in
>>>> my PR this is DagFileProcessor) we need to initialize it with the
>>>> "use_grpc" flag and pass it a channel that will be used for communication.
>>>>     2) the DB methods we have (inventory in the AIP) will have to be
>>>> refactored slightly. This is what really was added, the original
>>>> "process_file" method was renamed to "process_file_db" and the "callers" of
>>>> the method are completely intact.
>>>>
>>>>     def process_file_grpc(
>>>>         self,
>>>>         file_path: str,
>>>>         callback_requests: List[CallbackRequest],
>>>>         pickle_dags: bool = False,
>>>>     ) -> Tuple[int, int]:
>>>>         request = internal_api_pb2.FileProcessorRequest(path=file_path,
>>>> pickle_dags=pickle_dags)
>>>>         for callback_request in callback_requests:
>>>>             request.callbacks.append(callback_request.to_protobuf())
>>>>         res = self.stub.processFile(request)
>>>>         return res.dagsFound, res.errorsFound
>>>>
>>>>     def process_file(
>>>>         self,
>>>>         file_path: str,
>>>>         callback_requests: List[CallbackRequest],
>>>>         pickle_dags: bool = False,
>>>>     ) -> Tuple[int, int]:
>>>>         if self.use_grpc:
>>>>             return self.process_file_grpc(
>>>>                 file_path=file_path,
>>>> callback_requests=callback_requests, pickle_dags=pickle_dags
>>>>             )
>>>>         return self.process_file_db(
>>>>             file_path=file_path, callback_requests=callback_requests,
>>>> pickle_dags=pickle_dags
>>>>         )
>>>>
>>>> You can see all the details in the PR
>>>> https://github.com/apache/airflow/pull/25094.
>>>>
>>>> # POC testing harness (self-service :) )
>>>> ----------------------------------------------------
>>>>
>>>> You can also very easily test it yourself:
>>>>
>>>> * airflow internal-api server -> runs internal API server.
>>>> * airflow internal-api test-client --num-repeats 10 --num-callbacks 10
>>>> --use-grpc -> runs file-processing of all example dags 10 times with 10x3
>>>> callbacks sent. Same command without --use-grpc will run using direct DB
>>>> access. Server listens on "50051" port, client connects to
>>>> "localhost:50051" - you can modify the code to use remote IP/different
>>>> ports (easy to find by localhost:50051).
>>>>
>>>> # Discussion and voting
>>>> --------------------------------
>>>>
>>>> Of course some decisions in the PR can be improved (I focused on
>>>> explicitness of the POC more than anything else). We can discuss some
>>>> changes and improvements to some decisions I made when the PR will be in
>>>> "reviewable state" and I am happy to improve it, but for now I would like
>>>> to focus on answering the two questions:
>>>>
>>>> * Does it look plausible?
>>>> * Does it look like it's almost ready to vote on it? (I will update the
>>>> AIP before starting voting, of course).
>>>>
>>>> Let me know what you think.
>>>>
>>>> J.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 1, 2022 at 3:11 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>>
>>>>> Since we have AIP-43 already approved, I think I would love to have
>>>>> more questions and discussions about AIP-44 - The "Airflow Internal API"
>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>>> (as the new name is).
>>>>>
>>>>> For those who would like to get more context - recording of the
>>>>> meeting where both AIP-43 and AIP-44 scope and proposals were discussed 
>>>>> can
>>>>> be found here:
>>>>> https://drive.google.com/file/d/1SMFzazuY1kg4B4r11wNt8EQ_PmTDRKq6/view
>>>>>
>>>>> In the AIP I just made a small clarification regarding some "future"
>>>>> changes  - specifically the token security might be nicely handled 
>>>>> together
>>>>> with AIP-46 "Add support for docker runtime isolation" that is proposed by
>>>>> Ping.
>>>>>
>>>>> My goal is to gather comments till the end of the week, and if there
>>>>> will be no big concerns, I would love to start voting next week.
>>>>>
>>>>> J.
>>>>>
>>>>>
>>>>> On Mon, Jan 3, 2022 at 2:48 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>>>
>>>>>> Also AIP-44 - which is the DB isolation mode is much more detailed and
>>>>>> ready for deeper discussion if needed.
>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API
>>>>>>
>>>>>> On Tue, Dec 14, 2021 at 12:07 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > And just to add to that.
>>>>>> >
>>>>>> > Thanks again for the initial comments and pushing us to provide more
>>>>>> > details. That allowed us to discuss and focus on many of the aspects
>>>>>> > that were raised and we have many more answers now:
>>>>>> >
>>>>>> > * first of all - we focused on making sure impact on the existing
>>>>>> code
>>>>>> > and "behavior" of Airflow is minimal. In fact there should be
>>>>>> > virtually no change vs. current behavior when db isolation is
>>>>>> > disabled.
>>>>>> > * secondly - we've done a full inventory of how the API needed
>>>>>> should
>>>>>> > look like and (not unsurprisingly) it turned out that the current
>>>>>> > REST-style API is good for part of it but most of the 'logic" of
>>>>>> > airflow can be done efficiently when we go to RPC-style API. However
>>>>>> > we propose that the authorization/exposure of the API is the same as
>>>>>> > we use currently in the REST API, this will allow us to reuse a big
>>>>>> > part of the infrastructure we already have
>>>>>> > * thirdly - we took deep into our hearts the comments about having
>>>>>> to
>>>>>> > maintain pretty much the same logic in a few different places.
>>>>>> That's
>>>>>> > an obvious maintenance problem. The proposal we came up with
>>>>>> addresses
>>>>>> > it - we are going to keep the logic of Airflow internals in one
>>>>>> place
>>>>>> > only and we will simply smartly route on where the logic will be
>>>>>> > executed
>>>>>> > * regarding the performance impact - we described the deployment
>>>>>> > options that our proposal makes available - we do not want to favor
>>>>>> > one deployment option over another, but we made sure the
>>>>>> architecture
>>>>>> > is done in the way, that you can choose which deployment is good for
>>>>>> > you: "no isolation", "partial isolation", "full isolation" - each
>>>>>> with
>>>>>> > different performance/resource characteristics - all of them fully
>>>>>> > horizontally scalable and nicely manageable.
>>>>>> >
>>>>>> > We look forward to comments, also for the API 43 -
>>>>>> >
>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation
>>>>>> > - AIP-43 is a prerequiste to AIP-44 and they both work together.
>>>>>> >
>>>>>> > We also will think and discuss more follow-up AIPs once we get those
>>>>>> > approved (hopefully ;) ). The multi-tenancy is a 'long haul" and
>>>>>> while
>>>>>> > those two AIPs are foundational building blocks, there are at least
>>>>>> > few more follow-up AIPs to be able to say "we're done with
>>>>>> > Multi-tenancy" :).
>>>>>> >
>>>>>> > J.
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Tue, Dec 14, 2021 at 9:58 AM Mateusz Henc
>>>>>> <mh...@google.com.invalid> wrote:
>>>>>> > >
>>>>>> > > Hi,
>>>>>> > >
>>>>>> > > As promised we (credits to Jarek) updated the AIPs, added more
>>>>>> details, did inventory and changed the way API endpoints are generated.
>>>>>> > >
>>>>>> > > We also renamed it to Airflow Internal API - so the url has
>>>>>> changed:
>>>>>> > >
>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
>>>>>> > >
>>>>>> > > Please take a look, any comments are highly appreciated.
>>>>>> > >
>>>>>> > > Best regards,
>>>>>> > > Mateusz Henc
>>>>>> > >
>>>>>> > >
>>>>>> > > On Mon, Dec 6, 2021 at 3:09 PM Mateusz Henc <mh...@google.com>
>>>>>> wrote:
>>>>>> > >>
>>>>>> > >> Hi,
>>>>>> > >> Thank you Ash for your feedback (in both AIPs)
>>>>>> > >>
>>>>>> > >> We are working to address your concerns. We will update the AIPs
>>>>>> in a few days.
>>>>>> > >> I will let you know when it's done.
>>>>>> > >>
>>>>>> > >> Best regards,
>>>>>> > >> Mateusz Henc
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> On Fri, Dec 3, 2021 at 7:27 PM Jarek Potiuk <ja...@potiuk.com>
>>>>>> wrote:
>>>>>> > >>>
>>>>>> > >>> Cool. Thanks for the guidance.
>>>>>> > >>>
>>>>>> > >>> On Fri, Dec 3, 2021 at 6:37 PM Ash Berlin-Taylor <
>>>>>> a...@apache.org> wrote:
>>>>>> > >>> >
>>>>>> > >>> > - make an inventory: It doesn't need to be exhaustive, but a
>>>>>> representative sample.
>>>>>> > >>> > - More clearly define _what_ the API calls return -- object
>>>>>> type, methods on them etc.
>>>>>> > >>> >
>>>>>> > >>> > From the AIP you have this example:
>>>>>> > >>> >
>>>>>> > >>> >   def get_dag_run(self, dag_id, execution_date):
>>>>>> > >>> >     return self.db_client.get_dag_run(dag_id,execution_date)
>>>>>> > >>> >
>>>>>> > >>> > What does that _actually_ return? What capabilities does it
>>>>>> have?
>>>>>> > >>> >
>>>>>> > >>> > (I have other thoughts but those are less fundamental and can
>>>>>> be discussed later)
>>>>>> > >>> >
>>>>>> > >>> > -ash
>>>>>> > >>> >
>>>>>> > >>> > On Fri, Dec 3 2021 at 18:20:21 +0100, Jarek Potiuk <
>>>>>> ja...@potiuk.com> wrote:
>>>>>> > >>> >
>>>>>> > >>> > Surely - if you think that we need to do some more work to
>>>>>> get confidence, that's fine. I am sure we can improve it to the level 
>>>>>> that
>>>>>> we will not have to do full performance tests, and you are confident in 
>>>>>> the
>>>>>> direction. Just to clarify your concerns and make sure we are on the same
>>>>>> page - as I understand it should: * make an inventory of "actual changes"
>>>>>> the proposal will involve in the database-low-level code of Airflow * 
>>>>>> based
>>>>>> on that either assessment that those changes are unlikely (or likely 
>>>>>> make a
>>>>>> performance impact) * if we asses that it is likely to have an impact, 
>>>>>> some
>>>>>> at least rudimentary performance tests to prove that this is manageable I
>>>>>> think that might be a good exercise to do. Does it sound about right? Or 
>>>>>> do
>>>>>> you have any concerns about certain architectural decisions taken? No
>>>>>> problem with Friday, but if we get answers today. I think it will give us
>>>>>> time to think about it over the weekend and address it next week. J. On
>>>>>> Fri, Dec 3, 2021 at 5:56 PM Ash Berlin-Taylor <a...@apache.org> wrote:
>>>>>> > >>> >
>>>>>> > >>> > This is a fundamental change to the architecture with
>>>>>> significant possible impacts on performance, and likely requires 
>>>>>> touching a
>>>>>> large portion of the code base. Sorry, you're going to have to do expand 
>>>>>> on
>>>>>> the details first and work out what would actually be involved and what 
>>>>>> the
>>>>>> impacts will be: Right now I have serious reservations to this approach, 
>>>>>> so
>>>>>> I can't agree on the high level proposal without an actual proposal (The
>>>>>> current document is, at best, an outline, not an actual proposal.) Sorry 
>>>>>> to
>>>>>> be a grinch right before the weekend. Ash On Thu, Dec 2 2021 at 22:47:34
>>>>>> +0100, Jarek Potiuk <ja...@potiuk.com> wrote: Oh yeah - good point
>>>>>> and we spoke about performance testing/implications. Performance is
>>>>>> something we were discussing as the next step when we get general "OK" in
>>>>>> the direction - we just want to make sure that there are no "huge" 
>>>>>> blockers
>>>>>> in the way this is proposed and explain any doubts first, so that the
>>>>>> investment in performance part makes sense. We do not want to spend a lot
>>>>>> of time on getting the tests done and detailed inventory of methods/ API
>>>>>> calls to get - only to find out that this is generally "bad direction".
>>>>>> Just to clarify again - we also considered (alternative option) to
>>>>>> automatically map all the DB methods in the remote calls. But we dropped
>>>>>> that idea - precisely for the reason of performance, and transaction
>>>>>> integrity. So we are NOT mapping DB calls into API calls. those will be
>>>>>> "logical operations" on the database. Generally speaking, most of the API
>>>>>> calls for the "airflow system-level but executed in worker" calls will be
>>>>>> rather "coarse" than fine-grained. For example, the aforementioned "mini
>>>>>> scheduler" - where we want to make a single API call and run the whole of
>>>>>> it on the DBAPI side. So there - performance impact is very limited IMHO.
>>>>>> And If we see any other "logic" like that in other parts of the code
>>>>>> (zombie detection as an example). We plan to make a detailed inventory of
>>>>>> those once we get general "Looks good" for the direction. For now we did
>>>>>> some "rough" checking and it seems a plausible approach and quite doable.
>>>>>> One more note - the "fine-grained" ( "variable" update/retrieval,
>>>>>> "connection update retrieval") - via REST API will still be used by the
>>>>>> user's code though (Parsing DAGs, operators, workers and callbacks). We
>>>>>> also plan to make sure that none of the "Community" operators are using
>>>>>> "non-blessed" DB calls (we can do it in our CI). So at the end of the
>>>>>> exercise, all operators, hooks, etc. from the community will be 
>>>>>> guaranteed
>>>>>> to only use the DB APIs that are available in the "DB API" module. But
>>>>>> there I do not expect pretty much any performance penalty as those are 
>>>>>> very
>>>>>> fast and rare operations (and good thing there is that we can cache 
>>>>>> results
>>>>>> of those in workers/DAG processing). J. On Thu, Dec 2, 2021 at 7:16 PM
>>>>>> Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: Ah, my
>>>>>> bad, I missed that. I'd still like to see discussion of the performance
>>>>>> impacts, though. On Thu, Dec 2, 2021 at 11:14 AM Ash Berlin-Taylor <
>>>>>> a...@apache.org> wrote: The scheduler was excluded from the
>>>>>> components that would use the dbapi - the mini scheduler is the odd one 
>>>>>> out
>>>>>> here is it (currently) runs on the work but shares much of the code from
>>>>>> the scheduling path. -a On 2 December 2021 17:56:40 GMT, Andrew Godwin <
>>>>>> andrew.god...@astronomer.io.INVALID> wrote: I would also like to see
>>>>>> some discussion in this AIP about how the data is going to be serialised 
>>>>>> to
>>>>>> and from the database instances (obviously Connexion is involved, but I
>>>>>> presume more transformation code is needed than that) and the potential
>>>>>> slowdown this would cause. In my experience, a somewhat direct ORM 
>>>>>> mapping
>>>>>> like this is going to result in considerably slower times for any complex
>>>>>> operation that's touching a few hundred rows. Is there a reason this is
>>>>>> being proposed for the scheduler code, too? In my mind, the best approach
>>>>>> to multitenancy would be to remove all user-supplied code from the
>>>>>> scheduler and leave it with direct DB access, rather than trying to
>>>>>> indirect all scheduler access through another API layer. Andrew On Thu, 
>>>>>> Dec
>>>>>> 2, 2021 at 10:29 AM Jarek Potiuk <ja...@potiuk.com> wrote: Yeah - I
>>>>>> thik Ash you are completely right we need some more "detailed"
>>>>>> clarification. I believe, I know what you are - rightfully - afraid of 
>>>>>> (re
>>>>>> impact on the code), and maybe we have not done a good job on explaining 
>>>>>> it
>>>>>> with some of our assumptions we had when we worked on it with Mateusz.
>>>>>> Simply it was not clear that our aim is to absolutely minimise the impact
>>>>>> on the "internal DB transactions" done in schedulers and workers. The 
>>>>>> idea
>>>>>> is that change will at most result in moving an execution of the
>>>>>> transactions to another process but not changing what the DB transactions
>>>>>> do internally. Actually this was one of the reason for the "alternative"
>>>>>> approach (you can see it in the document) we discussed about - hijack
>>>>>> "sqlalchemy session" - this is far too low level and the aim of the
>>>>>> "DB-API" is NOT to replace direct DB calls (Hence we need to figure out a
>>>>>> better name). The API is there to provide "scheduler logic" API and "REST
>>>>>> access to Airflow primitives like dags/tasks/variables/connections" etc..
>>>>>> As an example (which we briefly talked about in slack) the
>>>>>> "_run_mini_scheduler_on_child_tasks" case (
>>>>>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274)
>>>>>> is an example (that we would put in the doc). As we thought of it - this 
>>>>>> is
>>>>>> a "single DB-API operation". Those are not Pure REST calls of course, 
>>>>>> they
>>>>>> are more RPC-like calls. That is why even initially I thought of 
>>>>>> separating
>>>>>> the API completely. But since there are a lot of common "primitive" calls
>>>>>> that we can re-use, I think having a separate DB-API component which will
>>>>>> re-use connexion implementation, replacing authentication with the custom
>>>>>> worker <> DB-API authentication is the way to go. And yes if we agree on
>>>>>> the general idea, we need to choose the best way on how to best "connect"
>>>>>> the REST API we have with the RPC-kind of API we need for some cases in
>>>>>> workers. But we wanted to make sure we are on the same page with the
>>>>>> direction. And yes it means that DB-API will potentially have to handle
>>>>>> quite a number of DB operations (and that it has to be replicable and
>>>>>> scalable as well) - but DB-API will be "stateless" similarly as the
>>>>>> webserver is, so it will be scalable by definition. And yest performance
>>>>>> tests will be part of POC - likely even before we finally ask for votes
>>>>>> there. So in short: * no modification or impact on current scheduler
>>>>>> behaviour when DB Isolation is disabled * only higher level methods will 
>>>>>> be
>>>>>> moved out to DB-API and we will reuse existing "REST" APIS where it makes
>>>>>> sense * we aim to have "0" changes to the logic of processing - both in 
>>>>>> Dag
>>>>>> Processing logic and DB API. We think with this architecture we proposed
>>>>>> it's perfectly doable I hope this clarifies a bit, and once we agree on
>>>>>> general direction, we will definitely work on adding more details and
>>>>>> clarification (we actually already have a lot of that but we just wanted 
>>>>>> to
>>>>>> start with explaining the idea and going into more details later when we
>>>>>> are sure there are no "high-level" blockers from the community. J, On 
>>>>>> Thu,
>>>>>> Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote: > >
>>>>>> I just provided a general idea for the approach - but if you want me to 
>>>>>> put
>>>>>> more examples then I am happy to do that > > > Yes please. > > It is too
>>>>>> general for me and I can't work out what effect it would actually have on
>>>>>> the code base, especially how it would look with the config option to
>>>>>> enable/disable direct db access. > > -ash > > On Thu, Dec 2 2021 at
>>>>>> 16:36:57 +0100, Mateusz Henc <mh...@google.com.INVALID> wrote: > >
>>>>>> Hi, > I am sorry if it is not clear enough, let me try to explain it 
>>>>>> here,
>>>>>> so maybe it gives more light on the idea. > See my comments below > > On
>>>>>> Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org>
>>>>>> wrote: >> >> I'm sorry to say it, but this proposal right just doesn't
>>>>>> contain enough detail to say what the actual changes to the code would 
>>>>>> be,
>>>>>> and what the impact would be >> >> To take the one example you have so 
>>>>>> far:
>>>>>> >> >> >> def get_dag_run(self, dag_id, execution_date): >> return
>>>>>> self.db_client.get_dag_run(dag_id,execution_date) >> >> So form this
>>>>>> snippet I'm guessing it would be used like this: >> >> dag_run =
>>>>>> db_client.get_dag_run(dag_id, execution_date) >> >> What type of object 
>>>>>> is
>>>>>> returned? > > > As it replaces: > dag_run = session.query(DagRun) >
>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
>>>>>> execution_date) >
>>>>>> .first() > > then the type of the object will be exactly the same 
>>>>>> (DagRun)
>>>>>> . > >> >> >> Do we need one API method per individual query we have in 
>>>>>> the
>>>>>> source? > > > No, as explained by the sentence: > > The method may be
>>>>>> extended, accepting more optional parameters to avoid having too many
>>>>>> similar implementations. > > >> >> >> Which components would use this new
>>>>>> mode when it's enabled? > > > You may read: > Airflow Database APi is a 
>>>>>> new
>>>>>> independent component of Airflow. It allows isolating some components
>>>>>> (Worker, DagProcessor and Triggerer) from direct access to DB. > >> >> 
>>>>>> But
>>>>>> what you haven't said the first thing about is what _other_ changes would
>>>>>> be needed in the code. To take a fairly simple example: >> >> dag_run =
>>>>>> db_client.get_dag_run(dag_id, execution_date) >> dag_run.queued_at =
>>>>>> timezone.now() >> # How do I save this? >> >> In short, you need to put a
>>>>>> lot more detail into this before we can even have an idea of the full 
>>>>>> scope
>>>>>> of the change this proposal would involve, and what code changes would be
>>>>>> needed for compnents to work with and without this setting enabled. > > >
>>>>>> For this particular example - it depends on the intention of the code
>>>>>> author > - If this should be in transaction - then I would actually
>>>>>> introduce new method like enqueue_dag_run(...) that would run these two
>>>>>> steps on Airflow DB API side > - if not then, maybe just the
>>>>>> "update_dag_run" method accepting the whole "dag_run" object and saving 
>>>>>> it
>>>>>> to the DB. > > In general - we could take naive approach, eg replace 
>>>>>> code:
>>>>>> > dag_run = session.query(DagRun) > .filter(DagRun.dag_id == dag_id,
>>>>>> DagRun.execution_date == execution_date) > .first() > with: > if
>>>>>> self.db_isolation: > dag_run = session.query(DagRun) >
>>>>>> .filter(DagRun.dag_id == dag_id, DagRun.execution_date == 
>>>>>> execution_date) >
>>>>>> .first() > else: > dag_run = db_client.get_dag_run(self, dag_id,
>>>>>> execution_date) > > The problem is that Airflow DB API would need to have
>>>>>> the same implementation for the query - so duplicated code. That's why we
>>>>>> propose moving this code to the DBClient which is also used by the 
>>>>>> Airflow
>>>>>> DB API(in DB direct mode). > > I know there are many places where the 
>>>>>> code
>>>>>> is much more complicated than a single query, but they must be handled
>>>>>> one-by-one, during the implementation, otherwise this AIP would be way 
>>>>>> too
>>>>>> big. > > I just provided a general idea for the approach - but if you 
>>>>>> want
>>>>>> me to put more examples then I am happy to do that > > Best regards, >
>>>>>> Mateusz Henc > >> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc
>>>>>> <mh...@google.com.INVALID> wrote: >> >> Hi, >> I just added a new
>>>>>> AIP for running some Airflow components in DB-isolation mode, without
>>>>>> direct access to the Airflow Database, but they will use a new API for 
>>>>>> thi
>>>>>> purpose. >> >> PTAL: >>
>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API
>>>>>> >> >> Open question: >> I called it "Airflow Database API" - however I 
>>>>>> >> >> feel
>>>>>> it could be more than just an access layer for the database. So if you 
>>>>>> have
>>>>>> a better name, please let me know, I am happy to change it. >> >> Best
>>>>>> regards, >> Mateusz Henc
>>>>>>
>>>>>

Reply via email to