Hi everyone!

Starting a conversation here about extending Airflow for supporting a more 
functional way to define DAGs including better data dependency/lineage clarity 
on the DAG itself. I believe adding this functional extension would allow to 
support more Data pipelines use cases and extend Airflow usage, while being 
backwards compatible.

My proposal is to include a __call__ function that would dynamically replace 
class attributes before executing the `pre_execute` and `execute` function. 
This tied with a XComArg, a class that points to a previous task XCom pushed 
value, allows to define DAGs in a more functional and data approach. Basically 
my proposal is:


• Add a __call__ function in BaseOperator that accepts Inlets (in my case its 
XComArgs)
• Add a simple Python Decorator to create PythonOperators easily from functions
• Resolves them before executing any main operator execution on a worker/K8s 
pod….
• Log their values on execution time and make them visible in the Web UI for 
each task instance
• Set attribute in the operator instance
• Executes the operator and returns an XComArgs that can later be tied in a new 
operator as an Inlet…


I wrote a small extension (~200 lines of code) that implements it. Here’s what 
it would look like with existing operators (and that works with some simple 
tweaks on the already existing codebase):

get_ip = SimpleHttpOperator(
     task_id='get_ip', endpoint='get', method='GET', xcom_push=True
 )

 @operation
 def email_subject_generator(raw_json: str) -> str:
   external_ip = json.loads(raw_json)['origin']
   return f"Server connected from {external_ip}”

 @operation
 def email_body_generator(raw_json: str) -> str:
   return """
   Seems like today your server executing Airflow is connected from the 
external IP {origin}<br>
   <br>
   Have a nice day!
   """.format(**json.loads(raw_json))

 send_email = EmailOperator(
     task_id='send_email',
     to="exam...@example.com",
     subject='',
     html_content=''
 )

 ip_info = get_ip()
 subject = email_subject_generator(ip_info)
 body = email_body_generator(ip_info)
 send_email(subject=subject, html_content=body)

As someone wise sometime said, code is better than words, so here’s my 
experimental code: https://github.com/casassg/corrent  and a couple of example 
DAGs I wrote: https://github.com/casassg/corrent/tree/master/dags

This approach is highly inspired from some alternatives that have lately 
appeared on ETL pipelines world (ex: Dagster) that support more functional ways 
of defining DAGs.

I believe this approach could benefit the Airflow community to grow. What do 
you think? Any thoughts on functional DAGs? Would love to hear what are pros 
and cons of this expansion. To me, it seems highly linked to the Lineage 
discussion happening at the same time, as both discussions are around improving 
data pipelines.

Thanks!

Gerard Casas Saez
Twitter | Cortex | @casassaez

Reply via email to