As we wrap the work on AIP-31 (functional definition), I wanted to bring 
another idea here for discussion.

The concept is to parametrize pipelines using a similar class than XComArg that 
we introduced recently. As of 1.10.10, we can use the UI to set the DagRun 
configuration on the trigger DAG view using a json blob.

Accessing those is still hard (you need to pull DagRun from current context and 
then access the conf object). My proposal would be to add a new class that is 
resolved on execution similar to how we resolve XComArgs.
        class DAGParam(key:str, defaul:Any, type:type):

         def resolve(dag_run: DagRun):
                return dag_run.conf[self.key]

        # Raw usage:

        with DAG(...) as dag:
                param = DAGParam(key='number', default=3, type=int)
                SomeOperator(num=param)

        # From DAG object

        with DAG(...) as dag:
                 SomeOperator(num=dag.param(key='number', default=3, type=int))

        # Decorator approach:

        @dag(...)
        def my_dag(number:int=3):
                 SomeOperator(num=number)

Gist: https://gist.github.com/casassg/aa29b4d5d7f07f16630e591e351e570a

This would allow us to discover this params and surface them in the Trigger DAG 
UI as better form  similar to what we currently have at Twitter (see 
DagConstructors here or image attached)

Just wanted to drop this here to get people thoughts!

The idea is heavily inspired by Kubeflow PipelinesParams + pipeline decorator.

Gerard Casas Saez
Twitter | Cortex | @casassaez

Reply via email to