Ah, my bad, I missed that. I'd still like to see discussion of the
performance impacts, though.

On Thu, Dec 2, 2021 at 11:14 AM Ash Berlin-Taylor <a...@apache.org> wrote:

> The scheduler was excluded from the components that would use the dbapi -
> the mini scheduler is the odd one out here is it (currently) runs on the
> work but shares much of the code from the scheduling path.
>
> -a
>
> On 2 December 2021 17:56:40 GMT, Andrew Godwin
> <andrew.god...@astronomer.io.INVALID> wrote:
>>
>> I would also like to see some discussion in this AIP about how the data
>> is going to be serialised to and from the database instances (obviously
>> Connexion is involved, but I presume more transformation code is needed
>> than that) and the potential slowdown this would cause.
>>
>> In my experience, a somewhat direct ORM mapping like this is going to
>> result in considerably slower times for any complex operation that's
>> touching a few hundred rows.
>>
>> Is there a reason this is being proposed for the scheduler code, too? In
>> my mind, the best approach to multitenancy would be to remove all
>> user-supplied code from the scheduler and leave it with direct DB access,
>> rather than trying to indirect all scheduler access through another API
>> layer.
>>
>> Andrew
>>
>> On Thu, Dec 2, 2021 at 10:29 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> Yeah - I thik Ash you are completely right we need some more
>>> "detailed" clarification.
>>>
>>> I believe, I know what you are - rightfully - afraid of (re impact on
>>> the code), and maybe we have not done a good job on explaining it with
>>> some of our assumptions we had when we worked on it with Mateusz.
>>>
>>> Simply it was not clear that our aim is to absolutely minimise the
>>> impact on the "internal DB transactions" done in schedulers and
>>> workers. The idea is that change will at most result in moving an
>>> execution of the transactions to another process but not changing what
>>> the DB transactions do internally. Actually this was one of the reason
>>> for the "alternative" approach (you can see it in the document) we
>>> discussed about - hijack "sqlalchemy session" - this is far too low
>>> level and the aim of the "DB-API" is NOT to replace direct DB calls
>>> (Hence we need to figure out a better name). The API is there to
>>> provide "scheduler logic" API and "REST access to Airflow primitives
>>> like dags/tasks/variables/connections" etc..
>>>
>>> As an example (which we briefly talked about in slack) the
>>> "_run_mini_scheduler_on_child_tasks" case
>>> (
>>> https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274
>>> )
>>> is an example (that we would put in the doc). As we thought of it -
>>> this is a "single DB-API operation". Those are not Pure REST calls of
>>> course, they are more RPC-like calls. That is why even initially I
>>> thought of separating the API completely. But since there are a lot of
>>> common "primitive" calls that we can re-use, I think having a separate
>>> DB-API component which will re-use connexion implementation, replacing
>>> authentication with the custom worker <> DB-API authentication is the
>>> way to go. And yes if we agree on the general idea, we need to choose
>>> the best way on how to best "connect" the REST API we have with the
>>> RPC-kind of API we need for some cases in workers. But we wanted to
>>> make sure we are on the same page with the direction. And yes it means
>>> that DB-API will potentially have to handle quite a number of DB
>>> operations (and that it has to be replicable and scalable as well) -
>>> but DB-API will be "stateless" similarly as the webserver is, so it
>>> will be scalable by definition. And yest performance tests will be
>>> part of POC - likely even before we finally ask for votes there.
>>>
>>> So in short:
>>> * no modification or impact on current scheduler behaviour when DB
>>> Isolation is disabled
>>> * only higher level methods will be moved out to  DB-API and we will
>>> reuse existing "REST" APIS where it makes sense
>>> * we aim to have "0" changes to the logic of processing - both in Dag
>>> Processing logic and DB API. We think with this architecture we
>>> proposed it's perfectly doable
>>>
>>> I hope this clarifies a bit, and once we agree on general direction,
>>> we will definitely work on adding more details and clarification (we
>>> actually already have a lot of that but we just wanted to start with
>>> explaining the idea and going into more details later when we are sure
>>> there are no "high-level" blockers from the community.
>>>
>>> J,
>>>
>>> On Thu, Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org> wrote:
>>> >
>>> > I just provided a general idea for the approach - but if you want me
>>> to put more examples then I am happy to do that
>>> >
>>> >
>>> > Yes please.
>>> >
>>> > It is too general for me and I can't work out what effect it would
>>> actually have on the code base, especially how it would look with the
>>> config option to enable/disable direct db access.
>>> >
>>> > -ash
>>> >
>>> > On Thu, Dec 2 2021 at 16:36:57 +0100, Mateusz Henc
>>> <mh...@google.com.INVALID> wrote:
>>> >
>>> > Hi,
>>> > I am sorry if it is not clear enough, let me try to explain it here,
>>> so maybe it gives more light on the idea.
>>> > See my comments below
>>> >
>>> > On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org>
>>> wrote:
>>> >>
>>> >> I'm sorry to say it, but this proposal right just doesn't contain
>>> enough detail to say what the actual changes to the code would be, and what
>>> the impact would be
>>> >>
>>> >> To take the one example you have so far:
>>> >>
>>> >>
>>> >>   def get_dag_run(self, dag_id, execution_date):
>>> >>     return self.db_client.get_dag_run(dag_id,execution_date)
>>> >>
>>> >> So form this snippet I'm guessing it would be used like this:
>>> >>
>>> >>     dag_run = db_client.get_dag_run(dag_id, execution_date)
>>> >>
>>> >> What type of object is returned?
>>> >
>>> >
>>> > As it replaces:
>>> > dag_run = session.query(DagRun)
>>> >   .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
>>> execution_date)
>>> >   .first()
>>> >
>>> > then the type of the object will be exactly the same (DagRun) .
>>> >
>>> >>
>>> >>
>>> >> Do we need one API method per individual query we have in the source?
>>> >
>>> >
>>> > No, as explained by the sentence:
>>> >
>>> > The method may be extended, accepting more optional parameters to
>>> avoid having too many similar implementations.
>>> >
>>> >
>>> >>
>>> >>
>>> >> Which components would use this new mode when it's enabled?
>>> >
>>> >
>>> > You may read:
>>> > Airflow Database APi is a new independent component of Airflow. It
>>> allows isolating some components (Worker, DagProcessor and Triggerer) from
>>> direct access to DB.
>>> >
>>> >>
>>> >> But what you haven't said the first thing about is what _other_
>>> changes would be needed in the code. To take a fairly simple example:
>>> >>
>>> >>     dag_run = db_client.get_dag_run(dag_id, execution_date)
>>> >>     dag_run.queued_at = timezone.now()
>>> >>     # How do I save this?
>>> >>
>>> >> In short, you need to put a lot more detail into this before we can
>>> even have an idea of the full scope of the change this proposal would
>>> involve, and what code changes would be needed for compnents to work with
>>> and without this setting enabled.
>>> >
>>> >
>>> > For this particular example - it depends on the intention of the code
>>> author
>>> > - If this should be in transaction - then I would actually introduce
>>> new method like enqueue_dag_run(...) that would run these two steps on
>>> Airflow DB API side
>>> > - if not then, maybe just the "update_dag_run" method accepting the
>>> whole "dag_run" object and saving it to the DB.
>>> >
>>> > In general - we could take naive approach, eg replace code:
>>> > dag_run = session.query(DagRun)
>>> >   .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
>>> execution_date)
>>> >   .first()
>>> > with:
>>> > if self.db_isolation:
>>> >   dag_run = session.query(DagRun)
>>> >     .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
>>> execution_date)
>>> >     .first()
>>> > else:
>>> >   dag_run = db_client.get_dag_run(self, dag_id, execution_date)
>>> >
>>> > The problem is that Airflow DB API would need to have the same
>>> implementation for the query  - so duplicated code. That's why we propose
>>> moving this code to the DBClient which is also used by the Airflow DB
>>> API(in DB direct mode).
>>> >
>>> > I know there are many places where the code is much more complicated
>>> than a single query, but they must be handled one-by-one, during the
>>> implementation, otherwise this AIP would be way too big.
>>> >
>>> > I just provided a general idea for the approach - but if you want me
>>> to put more examples then I am happy to do that
>>> >
>>> > Best regards,
>>> > Mateusz Henc
>>> >
>>> >>
>>> >> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc
>>> <mh...@google.com.INVALID> wrote:
>>> >>
>>> >> Hi,
>>> >> I just added a new AIP for running some Airflow components in
>>> DB-isolation mode, without direct access to the Airflow Database, but they
>>> will use a new API for thi purpose.
>>> >>
>>> >> PTAL:
>>> >>
>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API
>>> >>
>>> >> Open question:
>>> >> I called it "Airflow Database API" - however I feel it could be more
>>> than just an access layer for the database. So if you have a better name,
>>> please let me know, I am happy to change it.
>>> >>
>>> >> Best regards,
>>> >> Mateusz Henc
>>>
>>

Reply via email to