I just took a look and it turns out that DebugExecutor works fine with
triggerer you just need to have one running.
You could run one in a subprocess. I experimented with refactoring the
subprocess hook for this purpose (so you can start the subprocess
asynchronously) and then ran this dag with debug executor and it worked.
from __future__ import annotations
import os
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
from datetime import timedelta
import pendulum
from airflow.models import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync
with DAG(
dag_id="example_sensors",
schedule="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async",
delta=timedelta(seconds=2))
from airflow.hooks.subprocess import SubprocessHook
# let's get triggerer running in a subprocess
hook = SubprocessHook()
triggerer_process = hook.start_process(command=["airflow",
"triggerer"], cwd="/tmp")
# now let's run the dag
dag.clear()
dag.run()
# now the dag has completed
# kill triggerer
hook.send_sigterm(triggerer_process)
result = hook.process_output(subprocess=triggerer_process)
assert result.exit_code == 0
# now triggerer has exited and our system test is done
So, something like this could be done.