Hi everyone! Starting a conversation here about extending Airflow for supporting a more functional way to define DAGs including better data dependency/lineage clarity on the DAG itself. I believe adding this functional extension would allow to support more Data pipelines use cases and extend Airflow usage, while being backwards compatible.
My proposal is to include a __call__ function that would dynamically replace class attributes before executing the `pre_execute` and `execute` function. This tied with a XComArg, a class that points to a previous task XCom pushed value, allows to define DAGs in a more functional and data approach. Basically my proposal is: • Add a __call__ function in BaseOperator that accepts Inlets (in my case its XComArgs) • Add a simple Python Decorator to create PythonOperators easily from functions • Resolves them before executing any main operator execution on a worker/K8s pod…. • Log their values on execution time and make them visible in the Web UI for each task instance • Set attribute in the operator instance • Executes the operator and returns an XComArgs that can later be tied in a new operator as an Inlet… I wrote a small extension (~200 lines of code) that implements it. Here’s what it would look like with existing operators (and that works with some simple tweaks on the already existing codebase): get_ip = SimpleHttpOperator( task_id='get_ip', endpoint='get', method='GET', xcom_push=True ) @operation def email_subject_generator(raw_json: str) -> str: external_ip = json.loads(raw_json)['origin'] return f"Server connected from {external_ip}” @operation def email_body_generator(raw_json: str) -> str: return """ Seems like today your server executing Airflow is connected from the external IP {origin}<br> <br> Have a nice day! """.format(**json.loads(raw_json)) send_email = EmailOperator( task_id='send_email', to="exam...@example.com", subject='', html_content='' ) ip_info = get_ip() subject = email_subject_generator(ip_info) body = email_body_generator(ip_info) send_email(subject=subject, html_content=body) As someone wise sometime said, code is better than words, so here’s my experimental code: https://github.com/casassg/corrent and a couple of example DAGs I wrote: https://github.com/casassg/corrent/tree/master/dags This approach is highly inspired from some alternatives that have lately appeared on ETL pipelines world (ex: Dagster) that support more functional ways of defining DAGs. I believe this approach could benefit the Airflow community to grow. What do you think? Any thoughts on functional DAGs? Would love to hear what are pros and cons of this expansion. To me, it seems highly linked to the Lineage discussion happening at the same time, as both discussions are around improving data pipelines. Thanks! Gerard Casas Saez Twitter | Cortex | @casassaez