I just provided a general idea for the approach - but if you want me
to put more examples then I am happy to do that
Yes please.
It is /too/ general for me and I can't work out what effect it would
actually have on the code base, especially how it would look with the
config option to enable/disable direct db access.
-ash
On Thu, Dec 2 2021 at 16:36:57 +0100, Mateusz Henc
<mh...@google.com.INVALID> wrote:
Hi,
I am sorry if it is not clear enough, let me try to explain it here,
so maybe it gives more light on the idea.
See my comments below
On Thu, Dec 2, 2021 at 3:39 PM Ash Berlin-Taylor <a...@apache.org
<mailto:a...@apache.org>> wrote:
I'm sorry to say it, but this proposal right just doesn't contain
enough detail to say what the actual changes to the code would be,
and what the impact would be
To take the one example you have so far:
def get_dag_run(self, dag_id, execution_date):
return self.db_client.get_dag_run(dag_id,execution_date)
So form this snippet I'm guessing it would be used like this:
dag_run = db_client.get_dag_run(dag_id, execution_date)
What type of object is returned?
As it replaces:
dag_run = session.query(DagRun)
.filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
execution_date)
.first()
then the type of the object will be exactly the same (DagRun) .
Do we need one API method per individual query we have in the source?
No, as explained by the sentence:
*The method may be extended*, accepting more optional parameters to
avoid having too many similar implementations.
Which components would use this new mode when it's enabled?
You may read:
Airflow Database APi is a new independent component of Airflow. It
allows isolating some components (*Worker, DagProcessor and
Triggerer*) from direct access to DB.
But what you haven't said the first thing about is what _other_
changes would be needed in the code. To take a fairly simple example:
dag_run = db_client.get_dag_run(dag_id, execution_date)
dag_run.queued_at = timezone.now()
# How do I save this?
In short, you need to put a lot more detail into this before we can
even have an idea of the full scope of the change this proposal
would involve, and what code changes would be needed for compnents
to work with and without this setting enabled.
For this particular example - it depends on the intention of the code
author
- If this should be in transaction - then I would actually introduce
new method like enqueue_dag_run(...) that would run these two steps
on Airflow DB API side
- if not then, maybe just the "update_dag_run" method accepting the
whole "dag_run" object and saving it to the DB.
In general - we could take naive approach, eg replace code:
dag_run = session.query(DagRun)
.filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
execution_date)
.first()
with:
if self.db_isolation:
dag_run = session.query(DagRun)
.filter(DagRun.dag_id == dag_id, DagRun.execution_date ==
execution_date)
.first()
else:
dag_run = db_client.get_dag_run(self, dag_id, execution_date)
The problem is that Airflow DB API would need to have the same
implementation for the query - so duplicated code. That's why we
propose moving this code to the DBClient which is also used by the
Airflow DB API(in DB direct mode).
I know there are many places where the code is much more complicated
than a single query, but they must be handled one-by-one, during the
implementation, otherwise this AIP would be way too big.
I just provided a general idea for the approach - but if you want me
to put more examples then I am happy to do that
Best regards,
Mateusz Henc
On Thu, Dec 2 2021 at 14:23:56 +0100, Mateusz Henc
<mh...@google.com.INVALID> wrote:
Hi,
I just added a new AIP for running some Airflow components in
DB-isolation mode, without direct access to the Airflow Database,
but they will use a new API for thi purpose.
PTAL:
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Database+API>
*Open question:*
I called it "Airflow Database API" - however I feel it could be
more than just an access layer for the database. So if you have a
better name, please let me know, I am happy to change it.
Best regards,
Mateusz Henc