I thought a bit about it and there is one more thing that I do not really
like about the approach when you specify the task to skip in the dag run
conf.
Currently the "logic" of the dag run conf is exclusively "dag specific" and
only used at "task execution" time (mind - it is not used in "dag
scheduling time" at all. Currently "Airflow Core" does not have to know
anything about it and can transparently pass it to the task without parsing
it. And it's the logic inside the DAG that should "understand" it. There is
not a single other use case where dag_run conf is used and parsed by
scheduler.
Trying to make "dag_run.conf" to be used either in "Airflow Core" or in
"DAG logic" depending on the parameter, is IMHO very wrong because it is
mixing independent concerns.
Currently things are simple:
* dag structure is defined by DAG parsing (this is the only thing Scheduler
cares)
* task execution uses the "static" structure defined by DAG parsing and
uses dag_run conf to implement logic to react to different parameters
When you look at this from that perspective - the proposal is a weird
mixture of both. And if we do it, in the future it **might** prevent us
from doing a number of optimizations. For example dag_run_conf does not
have to be parsed at all when the scheduler does its job. The only place
dag_run conf is parsed is only at the moment the task is executed.
For me this is a huge '-1' of the solution.
Coming back XD to your example - I think what you are describing there is a
particular DAG requirement, not something that Airflow core should take
care about. Also - no matter how the task will be skipped (whether in
pre-execute or by scheduler, if you want to "Skip" B2 but behave as if it
"succeeded" you have exactly the same problems with triggering rules. It
completely does not matter if you "skip" by throwing an exception or "Skip"
by airflow. What you are really talking about it is not to "skip" certain
tasks but rally about "pretending they succeeded". This is an entirely
different thing than that we started the discussion with :). We were
talking about "skipping" the task - which in Airflow is a different state
than "Succeeded".
And if you really want to "pretend success" you don't even have to (or even
should) use 'pre_execute' for it. There are better ways already.
If you are using the "modern" way of writing tasks (which I assume is the
case for anyone who does custom work like that) I imagine your B2 task
should be written this way (writing from memory so it might not compile):
@task
def task_b2(context: 'Context'):
if context.dag_run.conf.get("should_task2_pretend_it_succeeded"):
return
# do stuff (remember to use hooks instead of the operators in the @task
decorated tasks to use multiple Airflow providers)
IMHO - this is sooooooo much more "Modern Airflow way" of approaching the
problem:
* it's very simple and follows the "modern" way of writing Airflow tasks
(see also my talk about it from this week NYC Airflow Meetup
https://photos.app.goo.gl/ycCLpCmQ9fiTDNNDA)
* the logic of "pretending" is exactly there next to the code that
otherwise should be run and it's very obvious what happens (otherwise you
have to get a mental jump from DAG parameters passed through scheduler and
task execution)
* it's extremely explicit (https://www.python.org/dev/peps/pep-0020/) -
"explicit is better than implicit"
* dag_run configuration does not have to be understood by scheduler
* your DAG structure does not have to be "complicated" as the task will
end up in "success" state
* this is imperative, not declarative, and DAG writer decides how
"limited" the conditions are - Airflow does not limit you in any way and
the condition can be arbitrary complex or simple (and take into account
other parameters, dag_run, logical dates, actual dates of execution and
basically anything else.
>From all the comments I saw - I am pretty convinced this is a much better
way to approach it.
BTW. There is indeed one "potential" pro of the "scheduler-based" skip
calculation. It can provide a little optimization - the task does not need
to be run at all to make the decision in this case (similarly what we do
with DummyTasks now). But IMHO, this is totally offset by the fact that the
scheduler would have to parse and analyse the dag_run conf to make the
decisions - overall, it could be even slower in a number of cases. The
scheduler loop is pretty critical and any extra logic and parsing there
might have huge, unforeseen initial impact and we should only add any logic
there if we are absolutely sure all the potential performance impact is
well analyzed and understood.
Giorgio Zoppi,
XCom being "small" is also a thing of the past (if you use custom XCom
backends) - see the same talk of mine
https://photos.app.goo.gl/ycCLpCmQ9fiTDNNDA ) but also
https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html?highlight=custom%20xcom#custom-backends
and this nice article from Astronomer
https://www.astronomer.io/guides/custom-xcom-backends
J.
On Sat, Feb 5, 2022 at 4:22 PM Giorgio Zoppi <[email protected]>
wrote:
> Hey Jurek,
> Just a question about the future development, is the XComm backend
> replaceable now? The real power of Airflow is the 'defacto' the glue
> between different ways of mangling data, such as Python is the glue when
> you need to implement things a lower level, ie. Altair Simulation software
> is written for critical parts in C++ and binded at upper level in Python.
> Same we can state for Tensorflow, numpy. About my question: it would be
> nice to have as XComm backend a queue like redis or a non structured
> database for allowing scaling but i am not sure
> it it makes sense since XComm is just for short messages.
> Best Regards,
> Giorgio
>
>
>
>