Hello,

Apologies for spamming the whole listserv, I wanted to share some work I've 
done recently with a wider audience and wasn't sure if there was a better place 
to post.

For background, many scheduling frameworks like Airflow, Dagster, Prefect, etc, 
want you to define your DAGs in Python code. Things have become increasingly 
dynamic over the years, e.g. Airflow implemented Dynamic Task Mapping.

I wanted to go in the opposite direction and eliminate Python from the 
equation. Astronomer has dag-factory 
<https://github.com/astronomer/dag-factory> and there is also gusty 
<https://github.com/pipeline-tools/gusty>, but I wanted something to leverage 
the extremely configureable and extensible architecture of Hydra 
<https://hydra.cc/> + Pydantic <https://docs.pydantic.dev/latest/> detailed in 
this blog post 
<https://towardsdatascience.com/configuration-management-for-model-training-experiments-using-pydantic-and-hydra-d14a6ae84c13/>.

So I've written airflow-pydantic 
<https://github.com/airflow-laminar/airflow-pydantic> and airflow-config 
<https://github.com/airflow-laminar/airflow-config>. The former is a collection 
of Pydantic models either wrapping or validating Airflow structures, with 
support for instantiation (e.g. convert to airflow objects) or rendering 
(produce python code to create the python objects). The latter is a 
hydra/pydantic based configuration framework which lets you define DAG/task 
configuration in yaml, with support for fully declarative DAGs 
<https://airflow-laminar.github.io/airflow-config/docs/src/examples.html#declarative-dags-dag-factory>.
 With this, I am able to fully define DAGs in yaml 
<https://github.com/airflow-laminar/validation-dags/blob/7d65eb9173602640427231861a8c36cf489140fa/validation_dags/config/config.yaml#L199>.
 

I've also written a supporting cast of libraries for some things I needed:

- airflow-ha <https://github.com/airflow-laminar/airflow-ha> allows you to 
write "@continuous" style DAGs in a generic way by looping to retrigger that 
DAG on evaluation of a python callable. I needed this for AWS MWAA which sets 
time limits on DAG runs, but it can be useful in other contexts. Here is a 
funny little example 
<https://github.com/airflow-laminar/validation-dags/blob/main/validation_dags/config/config.yaml#L89-L104>
 that retriggers a DAG repeatedly counting down a context variable from run to 
run. 

- airflow-supervisor <https://github.com/airflow-laminar/airflow-supervisor> 
integrates Airflow with supervisor <https://supervisord.org/> which I use for 
"always on" DAGs in contexts where I do not necessarily want to rely on Airflow 
to be my process supervisor, or in contexts where I do not want my worker 
machine and my "always on" process to be the same machine (e.g. use the SSH 
Operator to  go to my "always on" machine, startup a process, and have airflow 
check in periodically with supervisor to see if the process is still running).


I wanted to share these in case anyone else was working on something similar or 
found it interesting, or if anything here might be interesting as a future 
mainline feature of airflow. Apologies for spamming the full list, I wasn't 
sure where else to discuss airflow things. Feel free to ping me privately on 
any of those GitHub repos.


Tim






tim.paine.nyc


Reply via email to