As we wrap the work on AIP-31 (functional definition), I wanted to bring
another idea here for discussion.
The concept is to parametrize pipelines using a similar class than XComArg that
we introduced recently. As of 1.10.10, we can use the UI to set the DagRun
configuration on the trigger DAG view using a json blob.
Accessing those is still hard (you need to pull DagRun from current context and
then access the conf object). My proposal would be to add a new class that is
resolved on execution similar to how we resolve XComArgs.
class DAGParam(key:str, defaul:Any, type:type):
def resolve(dag_run: DagRun):
return dag_run.conf[self.key]
# Raw usage:
with DAG(...) as dag:
param = DAGParam(key='number', default=3, type=int)
SomeOperator(num=param)
# From DAG object
with DAG(...) as dag:
SomeOperator(num=dag.param(key='number', default=3, type=int))
# Decorator approach:
@dag(...)
def my_dag(number:int=3):
SomeOperator(num=number)
Gist: https://gist.github.com/casassg/aa29b4d5d7f07f16630e591e351e570a
This would allow us to discover this params and surface them in the Trigger DAG
UI as better form similar to what we currently have at Twitter (see
DagConstructors here or image attached)
Just wanted to drop this here to get people thoughts!
The idea is heavily inspired by Kubeflow PipelinesParams + pipeline decorator.
Gerard Casas Saez
Twitter | Cortex | @casassaez