Actually this is even more complex as I found another case:
I thought about it too, but this will be only for 2.4+ and a ton of people
could benefit from that even now.
However I have a thought. I was waiting for something like that to appear
to have a very good reason to implement the "apache-airflow-future" package.
Maybe this is a cool opportunity to add the "apache-airflow-future" package.
We've been discussing it in the past and it could be a package that would
contain some of the tools, utils, etc. that are added in Airflow 2.3/2.4
and the like - very similar to Python's __future__.
In this case we could simply implement the variable check as
"airfow.uitls.dag_parsing_context.get_parsing_context()" AND at the same
time have the possibility to do "import airflow.__future__" (or similar)
that could simply monkeypatch "get_parsing_context" (with this - complex -
proctitle based implementation).
There are already a number of tools/utils like that available only in 2.2
and 2.3 and having a "future" package could make it simple for old version
users to use them.
BTW. This is the current version (<exploding head>):
import sys
import ast
import setproctitle
from airflow.models.dag import DAG
current_dag = None
if len(sys.argv) > 3 and sys.argv[1] == "tasks":
# task executed by starting a new Python interpreter
current_dag = sys.argv[3]
else:
try:
PROCTITLE_SUPERVISOR_PREFIX = "airflow task supervisor: "
PROCTITLE_TASK_RUNNER_PREFIX = "airflow task runner: "
proctitle = str(setproctitle.getproctitle())
if proctitle.startswith(PROCTITLE_SUPERVISOR_PREFIX):
# task executed via forked process in celery
args_string = proctitle[len(PROCTITLE_SUPERVISOR_PREFIX) :]
args = ast.literal_eval(args_string)
if len(args) > 3 and args[1] == "tasks":
current_dag = args[3]
elif proctitle.startswith(PROCTITLE_TASK_RUNNER_PREFIX):
# task executed via forked process in standard_task_runner
args = proctitle[len(PROCTITLE_TASK_RUNNER_PREFIX) :].split("
")
if len(args) > 0:
current_dag = args[0]
except Exception:
pass
for thing in list_of_things:
dag_id = f"generated_dag_{thing}"
if current_dag is not None and current_dag != dag_id:
continue # skip generation of non-selected DAG
dag = DAG(dag_id=dag_id, ...)
globals()[dag_id] = dag
J.
On Tue, Jul 19, 2022 at 4:59 PM Jed Cunningham <[email protected]>
wrote:
> I think we should only document the "DAG author friendly" approach (env
> var or otherwise) and hold the existing PR until it is ready. My 2c.
>