The scheduler was excluded from the components that would use the dbapi - the mini scheduler is the odd one out here is it (currently) runs on the work but shares much of the code from the scheduling path.
-a On 2 December 2021 17:56:40 GMT, Andrew Godwin <andrew.god...@astronomer.io.INVALID> wrote: >I would also like to see some discussion in this AIP about how the data is >going to be serialised to and from the database instances (obviously >Connexion is involved, but I presume more transformation code is needed >than that) and the potential slowdown this would cause. > >In my experience, a somewhat direct ORM mapping like this is going to >result in considerably slower times for any complex operation that's >touching a few hundred rows. > >Is there a reason this is being proposed for the scheduler code, too? In my >mind, the best approach to multitenancy would be to remove all >user-supplied code from the scheduler and leave it with direct DB access, >rather than trying to indirect all scheduler access through another API >layer. > >Andrew > >On Thu, Dec 2, 2021 at 10:29 AM Jarek Potiuk <ja...@potiuk.com> wrote: > >> Yeah - I thik Ash you are completely right we need some more >> "detailed" clarification. >> >> I believe, I know what you are - rightfully - afraid of (re impact on >> the code), and maybe we have not done a good job on explaining it with >> some of our assumptions we had when we worked on it with Mateusz. >> >> Simply it was not clear that our aim is to absolutely minimise the >> impact on the "internal DB transactions" done in schedulers and >> workers. The idea is that change will at most result in moving an >> execution of the transactions to another process but not changing what >> the DB transactions do internally. Actually this was one of the reason >> for the "alternative" approach (you can see it in the document) we >> discussed about - hijack "sqlalchemy session" - this is far too low >> level and the aim of the "DB-API" is NOT to replace direct DB calls >> (Hence we need to figure out a better name). The API is there to >> provide "scheduler logic" API and "REST access to Airflow primitives >> like dags/tasks/variables/connections" etc.. >> >> As an example (which we briefly talked about in slack) the >> "_run_mini_scheduler_on_child_tasks" case >> ( >> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274 >> ) >> is an example (that we would put in the doc). As we thought of it - >> this is a "single DB-API operation". Those are not Pure REST calls of >> course, they are more RPC-like calls. That is why even initially I >> thought of separating the API completely. But since there are a lot of >> common "primitive" calls that we can re-use, I think having a separate >> DB-API component which will re-use connexion implementation, replacing >> authentication with the custom worker <> DB-API authentication is the >> way to go. And yes if we agree on the general idea, we need to choose >> the best way on how to best "connect" the REST API we have with the >> RPC-kind of API we need for some cases in workers. But we wanted to >> make sure we are on the same page with the direction. And yes it means >> that DB-API will potentially have to handle quite a number of DB >> operations (and that it has to be replicable and scalable as well) - >> but DB-API will be "stateless" similarly as the webserver is, so it >> will be scalable by definition. And yest performance tests will be >> part of POC - likely even before we finally ask for votes there. >> >> So in short: >> * no modification or impact on current scheduler behaviour when DB >> Isolation is disabled >> * only higher level methods will be moved out to DB-API and we will >> reuse existing "REST" APIS where it makes sense >> * we aim to have "0" changes to the logic of processing - both in Dag >> Processing logic and DB API. We think with this architecture we >> proposed it's perfectly doable >> >> I hope this clarifies a bit, and once we agree on general direction, >> we will definitely work on adding more details and clarification (we >> actually already have a lot of that but we just wanted to start with >> explaining the idea and going into more details later when we are sure >> there are no "high-level" blockers from the community. >> >> J, >> >> On Thu, Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote: >> > >> > I just provided a general idea for the approach - but if you want me to >> put more examples then I am happy to do that >> > >> > >> > Yes please. >> > >> > It is too general for me and I can't work out what effect it would >> actually have on the code base, especially how it would look with the >> config option to enable/disable direct db access. >> > >> > -ash >> > >> > On Thu, Dec 2 2021 at 16:36:57 +0100, Mateusz Henc >> <mh...@google.com.INVALID> wrote: >> > >> > Hi, >> > I am sorry if it is not clear enough, let me try to explain it here, so >> maybe it gives more light on the idea. >> > See my comments below >> > >> > On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org> wrote: >> >> >> >> I'm sorry to say it, but this proposal right just doesn't contain >> enough detail to say what the actual changes to the code would be, and what >> the impact would be >> >> >> >> To take the one example you have so far: >> >> >> >> >> >> def get_dag_run(self, dag_id, execution_date): >> >> return self.db_client.get_dag_run(dag_id,execution_date) >> >> >> >> So form this snippet I'm guessing it would be used like this: >> >> >> >> dag_run = db_client.get_dag_run(dag_id, execution_date) >> >> >> >> What type of object is returned? >> > >> > >> > As it replaces: >> > dag_run = session.query(DagRun) >> > .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >> execution_date) >> > .first() >> > >> > then the type of the object will be exactly the same (DagRun) . >> > >> >> >> >> >> >> Do we need one API method per individual query we have in the source? >> > >> > >> > No, as explained by the sentence: >> > >> > The method may be extended, accepting more optional parameters to avoid >> having too many similar implementations. >> > >> > >> >> >> >> >> >> Which components would use this new mode when it's enabled? >> > >> > >> > You may read: >> > Airflow Database APi is a new independent component of Airflow. It >> allows isolating some components (Worker, DagProcessor and Triggerer) from >> direct access to DB. >> > >> >> >> >> But what you haven't said the first thing about is what _other_ changes >> would be needed in the code. To take a fairly simple example: >> >> >> >> dag_run = db_client.get_dag_run(dag_id, execution_date) >> >> dag_run.queued_at = timezone.now() >> >> # How do I save this? >> >> >> >> In short, you need to put a lot more detail into this before we can >> even have an idea of the full scope of the change this proposal would >> involve, and what code changes would be needed for compnents to work with >> and without this setting enabled. >> > >> > >> > For this particular example - it depends on the intention of the code >> author >> > - If this should be in transaction - then I would actually introduce new >> method like enqueue_dag_run(...) that would run these two steps on Airflow >> DB API side >> > - if not then, maybe just the "update_dag_run" method accepting the >> whole "dag_run" object and saving it to the DB. >> > >> > In general - we could take naive approach, eg replace code: >> > dag_run = session.query(DagRun) >> > .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >> execution_date) >> > .first() >> > with: >> > if self.db_isolation: >> > dag_run = session.query(DagRun) >> > .filter(DagRun.dag_id == dag_id, DagRun.execution_date == >> execution_date) >> > .first() >> > else: >> > dag_run = db_client.get_dag_run(self, dag_id, execution_date) >> > >> > The problem is that Airflow DB API would need to have the same >> implementation for the query - so duplicated code. That's why we propose >> moving this code to the DBClient which is also used by the Airflow DB >> API(in DB direct mode). >> > >> > I know there are many places where the code is much more complicated >> than a single query, but they must be handled one-by-one, during the >> implementation, otherwise this AIP would be way too big. >> > >> > I just provided a general idea for the approach - but if you want me to >> put more examples then I am happy to do that >> > >> > Best regards, >> > Mateusz Henc >> > >> >> >> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc >> <mh...@google.com.INVALID> wrote: >> >> >> >> Hi, >> >> I just added a new AIP for running some Airflow components in >> DB-isolation mode, without direct access to the Airflow Database, but they >> will use a new API for thi purpose. >> >> >> >> PTAL: >> >> >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API >> >> >> >> Open question: >> >> I called it "Airflow Database API" - however I feel it could be more >> than just an access layer for the database. So if you have a better name, >> please let me know, I am happy to change it. >> >> >> >> Best regards, >> >> Mateusz Henc >>