The scheduler was excluded from the components that would use the dbapi - the 
mini scheduler is the odd one out here is it (currently) runs on the work but 
shares much of the code from the scheduling path.

-a

On 2 December 2021 17:56:40 GMT, Andrew Godwin 
<andrew.god...@astronomer.io.INVALID> wrote:
>I would also like to see some discussion in this AIP about how the data is
>going to be serialised to and from the database instances (obviously
>Connexion is involved, but I presume more transformation code is needed
>than that) and the potential slowdown this would cause.
>
>In my experience, a somewhat direct ORM mapping like this is going to
>result in considerably slower times for any complex operation that's
>touching a few hundred rows.
>
>Is there a reason this is being proposed for the scheduler code, too? In my
>mind, the best approach to multitenancy would be to remove all
>user-supplied code from the scheduler and leave it with direct DB access,
>rather than trying to indirect all scheduler access through another API
>layer.
>
>Andrew
>
>On Thu, Dec 2, 2021 at 10:29 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Yeah - I thik Ash you are completely right we need some more
>> "detailed" clarification.
>>
>> I believe, I know what you are - rightfully - afraid of (re impact on
>> the code), and maybe we have not done a good job on explaining it with
>> some of our assumptions we had when we worked on it with Mateusz.
>>
>> Simply it was not clear that our aim is to absolutely minimise the
>> impact on the "internal DB transactions" done in schedulers and
>> workers. The idea is that change will at most result in moving an
>> execution of the transactions to another process but not changing what
>> the DB transactions do internally. Actually this was one of the reason
>> for the "alternative" approach (you can see it in the document) we
>> discussed about - hijack "sqlalchemy session" - this is far too low
>> level and the aim of the "DB-API" is NOT to replace direct DB calls
>> (Hence we need to figure out a better name). The API is there to
>> provide "scheduler logic" API and "REST access to Airflow primitives
>> like dags/tasks/variables/connections" etc..
>>
>> As an example (which we briefly talked about in slack) the
>> "_run_mini_scheduler_on_child_tasks" case
>> (
>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274
>> )
>> is an example (that we would put in the doc). As we thought of it -
>> this is a "single DB-API operation". Those are not Pure REST calls of
>> course, they are more RPC-like calls. That is why even initially I
>> thought of separating the API completely. But since there are a lot of
>> common "primitive" calls that we can re-use, I think having a separate
>> DB-API component which will re-use connexion implementation, replacing
>> authentication with the custom worker <> DB-API authentication is the
>> way to go. And yes if we agree on the general idea, we need to choose
>> the best way on how to best "connect" the REST API we have with the
>> RPC-kind of API we need for some cases in workers. But we wanted to
>> make sure we are on the same page with the direction. And yes it means
>> that DB-API will potentially have to handle quite a number of DB
>> operations (and that it has to be replicable and scalable as well) -
>> but DB-API will be "stateless" similarly as the webserver is, so it
>> will be scalable by definition. And yest performance tests will be
>> part of POC - likely even before we finally ask for votes there.
>>
>> So in short:
>> * no modification or impact on current scheduler behaviour when DB
>> Isolation is disabled
>> * only higher level methods will be moved out to  DB-API and we will
>> reuse existing "REST" APIS where it makes sense
>> * we aim to have "0" changes to the logic of processing - both in Dag
>> Processing logic and DB API. We think with this architecture we
>> proposed it's perfectly doable
>>
>> I hope this clarifies a bit, and once we agree on general direction,
>> we will definitely work on adding more details and clarification (we
>> actually already have a lot of that but we just wanted to start with
>> explaining the idea and going into more details later when we are sure
>> there are no "high-level" blockers from the community.
>>
>> J,
>>
>> On Thu, Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote:
>> >
>> > I just provided a general idea for the approach - but if you want me to
>> put more examples then I am happy to do that
>> >
>> >
>> > Yes please.
>> >
>> > It is too general for me and I can't work out what effect it would
>> actually have on the code base, especially how it would look with the
>> config option to enable/disable direct db access.
>> >
>> > -ash
>> >
>> > On Thu, Dec 2 2021 at 16:36:57 +0100, Mateusz Henc
>> <mh...@google.com.INVALID> wrote:
>> >
>> > Hi,
>> > I am sorry if it is not clear enough, let me try to explain it here, so
>> maybe it gives more light on the idea.
>> > See my comments below
>> >
>> > On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org> wrote:
>> >>
>> >> I'm sorry to say it, but this proposal right just doesn't contain
>> enough detail to say what the actual changes to the code would be, and what
>> the impact would be
>> >>
>> >> To take the one example you have so far:
>> >>
>> >>
>> >>   def get_dag_run(self, dag_id, execution_date):
>> >>     return self.db_client.get_dag_run(dag_id,execution_date)
>> >>
>> >> So form this snippet I'm guessing it would be used like this:
>> >>
>> >>     dag_run = db_client.get_dag_run(dag_id, execution_date)
>> >>
>> >> What type of object is returned?
>> >
>> >
>> > As it replaces:
>> > dag_run = session.query(DagRun)
>> >   .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
>> execution_date)
>> >   .first()
>> >
>> > then the type of the object will be exactly the same (DagRun) .
>> >
>> >>
>> >>
>> >> Do we need one API method per individual query we have in the source?
>> >
>> >
>> > No, as explained by the sentence:
>> >
>> > The method may be extended, accepting more optional parameters to avoid
>> having too many similar implementations.
>> >
>> >
>> >>
>> >>
>> >> Which components would use this new mode when it's enabled?
>> >
>> >
>> > You may read:
>> > Airflow Database APi is a new independent component of Airflow. It
>> allows isolating some components (Worker, DagProcessor and Triggerer) from
>> direct access to DB.
>> >
>> >>
>> >> But what you haven't said the first thing about is what _other_ changes
>> would be needed in the code. To take a fairly simple example:
>> >>
>> >>     dag_run = db_client.get_dag_run(dag_id, execution_date)
>> >>     dag_run.queued_at = timezone.now()
>> >>     # How do I save this?
>> >>
>> >> In short, you need to put a lot more detail into this before we can
>> even have an idea of the full scope of the change this proposal would
>> involve, and what code changes would be needed for compnents to work with
>> and without this setting enabled.
>> >
>> >
>> > For this particular example - it depends on the intention of the code
>> author
>> > - If this should be in transaction - then I would actually introduce new
>> method like enqueue_dag_run(...) that would run these two steps on Airflow
>> DB API side
>> > - if not then, maybe just the "update_dag_run" method accepting the
>> whole "dag_run" object and saving it to the DB.
>> >
>> > In general - we could take naive approach, eg replace code:
>> > dag_run = session.query(DagRun)
>> >   .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
>> execution_date)
>> >   .first()
>> > with:
>> > if self.db_isolation:
>> >   dag_run = session.query(DagRun)
>> >     .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
>> execution_date)
>> >     .first()
>> > else:
>> >   dag_run = db_client.get_dag_run(self, dag_id, execution_date)
>> >
>> > The problem is that Airflow DB API would need to have the same
>> implementation for the query  - so duplicated code. That's why we propose
>> moving this code to the DBClient which is also used by the Airflow DB
>> API(in DB direct mode).
>> >
>> > I know there are many places where the code is much more complicated
>> than a single query, but they must be handled one-by-one, during the
>> implementation, otherwise this AIP would be way too big.
>> >
>> > I just provided a general idea for the approach - but if you want me to
>> put more examples then I am happy to do that
>> >
>> > Best regards,
>> > Mateusz Henc
>> >
>> >>
>> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc
>> <mh...@google.com.INVALID> wrote:
>> >>
>> >> Hi,
>> >> I just added a new AIP for running some Airflow components in
>> DB-isolation mode, without direct access to the Airflow Database, but they
>> will use a new API for thi purpose.
>> >>
>> >> PTAL:
>> >>
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API
>> >>
>> >> Open question:
>> >> I called it "Airflow Database API" - however I feel it could be more
>> than just an access layer for the database. So if you have a better name,
>> please let me know, I am happy to change it.
>> >>
>> >> Best regards,
>> >> Mateusz Henc
>>

Reply via email to