Ah, my bad, I missed that. I'd still like to see discussion of the performance impacts, though.
On Thu, Dec 2, 2021 at 11:14 AM Ash Berlin-Taylor <a...@apache.org> wrote: > The scheduler was excluded from the components that would use the dbapi - > the mini scheduler is the odd one out here is it (currently) runs on the > work but shares much of the code from the scheduling path. > > -a > > On 2 December 2021 17:56:40 GMT, Andrew Godwin > <andrew.god...@astronomer.io.INVALID> wrote: >> >> I would also like to see some discussion in this AIP about how the data >> is going to be serialised to and from the database instances (obviously >> Connexion is involved, but I presume more transformation code is needed >> than that) and the potential slowdown this would cause. >> >> In my experience, a somewhat direct ORM mapping like this is going to >> result in considerably slower times for any complex operation that's >> touching a few hundred rows. >> >> Is there a reason this is being proposed for the scheduler code, too? In >> my mind, the best approach to multitenancy would be to remove all >> user-supplied code from the scheduler and leave it with direct DB access, >> rather than trying to indirect all scheduler access through another API >> layer. >> >> Andrew >> >> On Thu, Dec 2, 2021 at 10:29 AM Jarek Potiuk <ja...@potiuk.com> wrote: >> >>> Yeah - I thik Ash you are completely right we need some more >>> "detailed" clarification. >>> >>> I believe, I know what you are - rightfully - afraid of (re impact on >>> the code), and maybe we have not done a good job on explaining it with >>> some of our assumptions we had when we worked on it with Mateusz. >>> >>> Simply it was not clear that our aim is to absolutely minimise the >>> impact on the "internal DB transactions" done in schedulers and >>> workers. The idea is that change will at most result in moving an >>> execution of the transactions to another process but not changing what >>> the DB transactions do internally. Actually this was one of the reason >>> for the "alternative" approach (you can see it in the document) we >>> discussed about - hijack "sqlalchemy session" - this is far too low >>> level and the aim of the "DB-API" is NOT to replace direct DB calls >>> (Hence we need to figure out a better name). The API is there to >>> provide "scheduler logic" API and "REST access to Airflow primitives >>> like dags/tasks/variables/connections" etc.. >>> >>> As an example (which we briefly talked about in slack) the >>> "_run_mini_scheduler_on_child_tasks" case >>> ( >>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274 >>> ) >>> is an example (that we would put in the doc). As we thought of it - >>> this is a "single DB-API operation". Those are not Pure REST calls of >>> course, they are more RPC-like calls. That is why even initially I >>> thought of separating the API completely. But since there are a lot of >>> common "primitive" calls that we can re-use, I think having a separate >>> DB-API component which will re-use connexion implementation, replacing >>> authentication with the custom worker <> DB-API authentication is the >>> way to go. And yes if we agree on the general idea, we need to choose >>> the best way on how to best "connect" the REST API we have with the >>> RPC-kind of API we need for some cases in workers. But we wanted to >>> make sure we are on the same page with the direction. And yes it means >>> that DB-API will potentially have to handle quite a number of DB >>> operations (and that it has to be replicable and scalable as well) - >>> but DB-API will be "stateless" similarly as the webserver is, so it >>> will be scalable by definition. And yest performance tests will be >>> part of POC - likely even before we finally ask for votes there. >>> >>> So in short: >>> * no modification or impact on current scheduler behaviour when DB >>> Isolation is disabled >>> * only higher level methods will be moved out to DB-API and we will >>> reuse existing "REST" APIS where it makes sense >>> * we aim to have "0" changes to the logic of processing - both in Dag >>> Processing logic and DB API. We think with this architecture we >>> proposed it's perfectly doable >>> >>> I hope this clarifies a bit, and once we agree on general direction, >>> we will definitely work on adding more details and clarification (we >>> actually already have a lot of that but we just wanted to start with >>> explaining the idea and going into more details later when we are sure >>> there are no "high-level" blockers from the community. >>> >>> J, >>> >>> On Thu, Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote: >>> > >>> > I just provided a general idea for the approach - but if you want me >>> to put more examples then I am happy to do that >>> > >>> > >>> > Yes please. >>> > >>> > It is too general for me and I can't work out what effect it would >>> actually have on the code base, especially how it would look with the >>> config option to enable/disable direct db access. >>> > >>> > -ash >>> > >>> > On Thu, Dec 2 2021 at 16:36:57 +0100, Mateusz Henc >>> <mh...@google.com.INVALID> wrote: >>> > >>> > Hi, >>> > I am sorry if it is not clear enough, let me try to explain it here, >>> so maybe it gives more light on the idea. >>> > See my comments below >>> > >>> > On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org> >>> wrote: >>> >> >>> >> I'm sorry to say it, but this proposal right just doesn't contain >>> enough detail to say what the actual changes to the code would be, and what >>> the impact would be >>> >> >>> >> To take the one example you have so far: >>> >> >>> >> >>> >> def get_dag_run(self, dag_id, execution_date): >>> >> return self.db_client.get_dag_run(dag_id,execution_date) >>> >> >>> >> So form this snippet I'm guessing it would be used like this: >>> >> >>> >> dag_run = db_client.get_dag_run(dag_id, execution_date) >>> >> >>> >> What type of object is returned? >>> > >>> > >>> > As it replaces: >>> > dag_run = session.query(DagRun) >>> > .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>> execution_date) >>> > .first() >>> > >>> > then the type of the object will be exactly the same (DagRun) . >>> > >>> >> >>> >> >>> >> Do we need one API method per individual query we have in the source? >>> > >>> > >>> > No, as explained by the sentence: >>> > >>> > The method may be extended, accepting more optional parameters to >>> avoid having too many similar implementations. >>> > >>> > >>> >> >>> >> >>> >> Which components would use this new mode when it's enabled? >>> > >>> > >>> > You may read: >>> > Airflow Database APi is a new independent component of Airflow. It >>> allows isolating some components (Worker, DagProcessor and Triggerer) from >>> direct access to DB. >>> > >>> >> >>> >> But what you haven't said the first thing about is what _other_ >>> changes would be needed in the code. To take a fairly simple example: >>> >> >>> >> dag_run = db_client.get_dag_run(dag_id, execution_date) >>> >> dag_run.queued_at = timezone.now() >>> >> # How do I save this? >>> >> >>> >> In short, you need to put a lot more detail into this before we can >>> even have an idea of the full scope of the change this proposal would >>> involve, and what code changes would be needed for compnents to work with >>> and without this setting enabled. >>> > >>> > >>> > For this particular example - it depends on the intention of the code >>> author >>> > - If this should be in transaction - then I would actually introduce >>> new method like enqueue_dag_run(...) that would run these two steps on >>> Airflow DB API side >>> > - if not then, maybe just the "update_dag_run" method accepting the >>> whole "dag_run" object and saving it to the DB. >>> > >>> > In general - we could take naive approach, eg replace code: >>> > dag_run = session.query(DagRun) >>> > .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>> execution_date) >>> > .first() >>> > with: >>> > if self.db_isolation: >>> > dag_run = session.query(DagRun) >>> > .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >>> execution_date) >>> > .first() >>> > else: >>> > dag_run = db_client.get_dag_run(self, dag_id, execution_date) >>> > >>> > The problem is that Airflow DB API would need to have the same >>> implementation for the query - so duplicated code. That's why we propose >>> moving this code to the DBClient which is also used by the Airflow DB >>> API(in DB direct mode). >>> > >>> > I know there are many places where the code is much more complicated >>> than a single query, but they must be handled one-by-one, during the >>> implementation, otherwise this AIP would be way too big. >>> > >>> > I just provided a general idea for the approach - but if you want me >>> to put more examples then I am happy to do that >>> > >>> > Best regards, >>> > Mateusz Henc >>> > >>> >> >>> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc >>> <mh...@google.com.INVALID> wrote: >>> >> >>> >> Hi, >>> >> I just added a new AIP for running some Airflow components in >>> DB-isolation mode, without direct access to the Airflow Database, but they >>> will use a new API for thi purpose. >>> >> >>> >> PTAL: >>> >> >>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API >>> >> >>> >> Open question: >>> >> I called it "Airflow Database API" - however I feel it could be more >>> than just an access layer for the database. So if you have a better name, >>> please let me know, I am happy to change it. >>> >> >>> >> Best regards, >>> >> Mateusz Henc >>> >>