Yeah - I thik Ash you are completely right we need some more
"detailed" clarification.
I believe, I know what you are - rightfully - afraid of (re
impact on
the code), and maybe we have not done a good job on explaining
it with
some of our assumptions we had when we worked on it with Mateusz.
Simply it was not clear that our aim is to absolutely minimise
the
impact on the "internal DB transactions" done in schedulers and
workers. The idea is that change will at most result in moving an
execution of the transactions to another process but not
changing what
the DB transactions do internally. Actually this was one of the
reason
for the "alternative" approach (you can see it in the document)
we
discussed about - hijack "sqlalchemy session" - this is far too
low
level and the aim of the "DB-API" is NOT to replace direct DB
calls
(Hence we need to figure out a better name). The API is there to
provide "scheduler logic" API and "REST access to Airflow
primitives
like dags/tasks/variables/connections" etc..
As an example (which we briefly talked about in slack) the
"_run_mini_scheduler_on_child_tasks" case
(<https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py#L225-L274>)
is an example (that we would put in the doc). As we thought of
it -
this is a "single DB-API operation". Those are not Pure REST
calls of
course, they are more RPC-like calls. That is why even initially
I
thought of separating the API completely. But since there are a
lot of
common "primitive" calls that we can re-use, I think having a
separate
DB-API component which will re-use connexion implementation,
replacing
authentication with the custom worker <> DB-API authentication
is the
way to go. And yes if we agree on the general idea, we need to
choose
the best way on how to best "connect" the REST API we have with
the
RPC-kind of API we need for some cases in workers. But we wanted
to
make sure we are on the same page with the direction. And yes it
means
that DB-API will potentially have to handle quite a number of DB
operations (and that it has to be replicable and scalable as
well) -
but DB-API will be "stateless" similarly as the webserver is, so
it
will be scalable by definition. And yest performance tests will
be
part of POC - likely even before we finally ask for votes there.
So in short:
* no modification or impact on current scheduler behaviour when
DB
Isolation is disabled
* only higher level methods will be moved out to DB-API and we
will
reuse existing "REST" APIS where it makes sense
* we aim to have "0" changes to the logic of processing - both
in Dag
Processing logic and DB API. We think with this architecture we
proposed it's perfectly doable
I hope this clarifies a bit, and once we agree on general
direction,
we will definitely work on adding more details and clarification
(we
actually already have a lot of that but we just wanted to start
with
explaining the idea and going into more details later when we
are sure
there are no "high-level" blockers from the community.
J,
On Thu, Dec 2, 2021 at 4:46 PM Ash Berlin-Taylor <a...@apache.org
<mailto:a...@apache.org>> wrote:
>
> I just provided a general idea for the approach - but if you
want me to put more examples then I am happy to do that
>
>
> Yes please.
>
> It is too general for me and I can't work out what effect it
would actually have on the code base, especially how it would
look with the config option to enable/disable direct db access.
>
> -ash
>
> On Thu, Dec 2 2021 at 16:36:57 +0100, Mateusz Henc
<mh...@google.com.INVALID <mailto:mh...@google.com.INVALID>>
wrote:
>
> Hi,
> I am sorry if it is not clear enough, let me try to explain it
here, so maybe it gives more light on the idea.
> See my comments below
>
> On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor
<a...@apache.org <mailto:a...@apache.org>> wrote:
>>
>> I'm sorry to say it, but this proposal right just doesn't
contain enough detail to say what the actual changes to the code
would be, and what the impact would be
>>
>> To take the one example you have so far:
>>
>>
>> def get_dag_run(self, dag_id, execution_date):
>> return self.db_client.get_dag_run(dag_id,execution_date)
>>
>> So form this snippet I'm guessing it would be used like this:
>>
>> dag_run = db_client.get_dag_run(dag_id, execution_date)
>>
>> What type of object is returned?
>
>
> As it replaces:
> dag_run = session.query(DagRun)
> .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
execution_date)
> .first()
>
> then the type of the object will be exactly the same (DagRun) .
>
>>
>>
>> Do we need one API method per individual query we have in the
source?
>
>
> No, as explained by the sentence:
>
> The method may be extended, accepting more optional parameters
to avoid having too many similar implementations.
>
>
>>
>>
>> Which components would use this new mode when it's enabled?
>
>
> You may read:
> Airflow Database APi is a new independent component of
Airflow. It allows isolating some components (Worker,
DagProcessor and Triggerer) from direct access to DB.
>
>>
>> But what you haven't said the first thing about is what
_other_ changes would be needed in the code. To take a fairly
simple example:
>>
>> dag_run = db_client.get_dag_run(dag_id, execution_date)
>> dag_run.queued_at = timezone.now()
>> # How do I save this?
>>
>> In short, you need to put a lot more detail into this before
we can even have an idea of the full scope of the change this
proposal would involve, and what code changes would be needed for
compnents to work with and without this setting enabled.
>
>
> For this particular example - it depends on the intention of
the code author
> - If this should be in transaction - then I would actually
introduce new method like enqueue_dag_run(...) that would run
these two steps on Airflow DB API side
> - if not then, maybe just the "update_dag_run" method
accepting the whole "dag_run" object and saving it to the DB.
>
> In general - we could take naive approach, eg replace code:
> dag_run = session.query(DagRun)
> .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
execution_date)
> .first()
> with:
> if self.db_isolation:
> dag_run = session.query(DagRun)
> .filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
execution_date)
> .first()
> else:
> dag_run = db_client.get_dag_run(self, dag_id, execution_date)
>
> The problem is that Airflow DB API would need to have the same
implementation for the query - so duplicated code. That's why we
propose moving this code to the DBClient which is also used by
the Airflow DB API(in DB direct mode).
>
> I know there are many places where the code is much more
complicated than a single query, but they must be handled
one-by-one, during the implementation, otherwise this AIP would
be way too big.
>
> I just provided a general idea for the approach - but if you
want me to put more examples then I am happy to do that
>
> Best regards,
> Mateusz Henc
>
>>
>> On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc
<mh...@google.com.INVALID <mailto:mh...@google.com.INVALID>>
wrote:
>>
>> Hi,
>> I just added a new AIP for running some Airflow components in
DB-isolation mode, without direct access to the Airflow Database,
but they will use a new API for thi purpose.
>>
>> PTAL:
>>
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API>
>>
>> Open question:
>> I called it "Airflow Database API" - however I feel it could
be more than just an access layer for the database. So if you
have a better name, please let me know, I am happy to change it.
>>
>> Best regards,
>> Mateusz Henc