Hi everyone!
Starting a conversation here about extending Airflow for supporting a more
functional way to define DAGs including better data dependency/lineage clarity
on the DAG itself. I believe adding this functional extension would allow to
support more Data pipelines use cases and extend Airflow usage, while being
backwards compatible.
My proposal is to include a __call__ function that would dynamically replace
class attributes before executing the `pre_execute` and `execute` function.
This tied with a XComArg, a class that points to a previous task XCom pushed
value, allows to define DAGs in a more functional and data approach. Basically
my proposal is:
• Add a __call__ function in BaseOperator that accepts Inlets (in my case its
XComArgs)
• Add a simple Python Decorator to create PythonOperators easily from functions
• Resolves them before executing any main operator execution on a worker/K8s
pod….
• Log their values on execution time and make them visible in the Web UI for
each task instance
• Set attribute in the operator instance
• Executes the operator and returns an XComArgs that can later be tied in a new
operator as an Inlet…
I wrote a small extension (~200 lines of code) that implements it. Here’s what
it would look like with existing operators (and that works with some simple
tweaks on the already existing codebase):
get_ip = SimpleHttpOperator(
task_id='get_ip', endpoint='get', method='GET', xcom_push=True
)
@operation
def email_subject_generator(raw_json: str) -> str:
external_ip = json.loads(raw_json)['origin']
return f"Server connected from {external_ip}”
@operation
def email_body_generator(raw_json: str) -> str:
return """
Seems like today your server executing Airflow is connected from the
external IP {origin}<br>
<br>
Have a nice day!
""".format(**json.loads(raw_json))
send_email = EmailOperator(
task_id='send_email',
to="[email protected]",
subject='',
html_content=''
)
ip_info = get_ip()
subject = email_subject_generator(ip_info)
body = email_body_generator(ip_info)
send_email(subject=subject, html_content=body)
As someone wise sometime said, code is better than words, so here’s my
experimental code: https://github.com/casassg/corrent and a couple of example
DAGs I wrote: https://github.com/casassg/corrent/tree/master/dags
This approach is highly inspired from some alternatives that have lately
appeared on ETL pipelines world (ex: Dagster) that support more functional ways
of defining DAGs.
I believe this approach could benefit the Airflow community to grow. What do
you think? Any thoughts on functional DAGs? Would love to hear what are pros
and cons of this expansion. To me, it seems highly linked to the Lineage
discussion happening at the same time, as both discussions are around improving
data pipelines.
Thanks!
Gerard Casas Saez
Twitter | Cortex | @casassaez