It is possible, but this is similar to XCom support before XComArg, only allow 
string templatized fields + difficult to specify from the DAG.

My proposal is more focused on making it more functionally defined  for the 
functional DAG approach.
        dag = DAG(
            dag_id='airflow_tutorial_2',
            default_args=default_args,
            schedule_interval=None,
        )

        @dag.task
        def update_user(user_id: int)
                print(f’Hello {user_id}!’)

        user_id = dag.param(‘user_id’, default=1)
        update_user(user_id)

Or even use decorators to define a DAG:
        @dag(default_args=default_args, schedule_interval=None)
        def airflow_tutorial_2(user_id: int = 1)

                @task
                def update_user(user_id: int)
                        print(f’Hello {user_id}!’)

                update_user(user_id)


Not really proposing anything new, mostly just a DAG definition change (similar 
to AIP-31). I learned about param and trigger UI/CLI last week, so not I’m not 
sure about how much implementation it may need. It may be just a cosmetic 
change on DAG definition.

Also wondering, is the idea to surface those dag.params in the Trigger Run UI 
instead of asking the user to paste a JSON blob to overwrite them?

Gerard Casas Saez
Twitter | Cortex | @casassaez
On Jun 15, 2020, 5:02 PM -0600, Kaxil Naik <kaxiln...@gmail.com>, wrote:
> Isn't it already possible using params (
> https://github.com/apache/airflow/blob/master/airflow/models/dag.py#L138-L141
> )?
>
> Sample Usage:
> https://gist.github.com/kaxil/335d90da8821a4e515046ff0f470fc97#file-airflow_params_usage_2-py
>
> Currently, we allowing passing params in the DAG and overriding the params
> using dagrun_conf via CLI or UI:
>
> Code:
>
> -
> https://github.com/apache/airflow/blob/3de68501b7a76dce24bfd8a8b4659eedcf7ac29c/airflow/models/taskinstance.py#L1335-L1336
> -
> https://github.com/apache/airflow/blob/3de68501b7a76dce24bfd8a8b4659eedcf7ac29c/airflow/models/taskinstance.py#L1454-L1456
>
>
> Or am I missing something?
>
> Regards,
> Kaxil
>
> On Mon, Jun 15, 2020 at 11:48 PM Gerard Casas Saez
> <gcasass...@twitter.com.invalid> wrote:
>
> > I do not think we should support RunTimeParams to modify the topology (at
> > least at the beginning).
> >
> > Modify the topology involves quite a bit more of deeper changes. Even
> > though it may be useful, I believe the value/time tradeoff, is high, so
> > focusing on enabling parametrization on fixed topology is definitely an
> > easier step to focus on and will probs bring enough value.
> >
> > Curious what are other people thoughts on this?
> >
> > Gerard Casas Saez
> > Twitter | Cortex | @casassaez
> > On Jun 12, 2020, 10:00 AM -0600, Dan Davydov <ddavy...@twitter.com.invalid>,
> > wrote:
> > > I think this is a great idea! One thing that I think we should figure out
> > > before implementing is how to do so alongside DAG serialization, i.e.
> > > letting these params modify DAG topology might make it hard to store
> > > serialized representations for the Airflow services to consume and
> > render,
> > > though that may be more of a statement about the dagrun configuration and
> > > orthogonal to the change proposed here.
> > >
> > > On Thu, Jun 11, 2020 at 7:58 PM Gerard Casas Saez
> > > <gcasass...@twitter.com.invalid> wrote:
> > >
> > > > As we wrap the work on AIP-31 (functional definition), I wanted to
> > bring
> > > > another idea here for discussion.
> > > >
> > > > The concept is to parametrize pipelines using a similar class than
> > XComArg
> > > > that we introduced recently. As of 1.10.10, we can use the UI to set
> > the
> > > > DagRun configuration on the trigger DAG view using a json blob.
> > > >
> > > > Accessing those is still hard (you need to pull DagRun from current
> > > > context and then access the conf object). My proposal would be to add
> > a new
> > > > class that is resolved on execution similar to how we resolve XComArgs.
> > > >
> > > > class DAGParam(key:str, defaul:Any, type:type):
> > > >
> > > >
> > > > def resolve(dag_run: DagRun):
> > > >
> > > > return dag_run.conf[self.key]
> > > >
> > > >
> > > > # Raw usage:
> > > >
> > > >
> > > > with DAG(...) as dag:
> > > >
> > > > param = DAGParam(key='number', default=3, type=int)
> > > >
> > > > SomeOperator(num=param)
> > > >
> > > >
> > > > # From DAG object
> > > >
> > > >
> > > > with DAG(...) as dag:
> > > >
> > > > SomeOperator(num=dag.param(key='number', default=3, type=int))
> > > >
> > > >
> > > > # Decorator approach:
> > > >
> > > >
> > > > @dag(...)
> > > >
> > > > def my_dag(number:int=3):
> > > >
> > > > SomeOperator(num=number)
> > > >
> > > >
> > > > Gist: https://gist.github.com/casassg/aa29b4d5d7f07f16630e591e351e570a
> > > >
> > > > This would allow us to discover this params and surface them in the
> > Trigger
> > > > DAG UI
> > > > <https://%20
> > https://airflow.apache.org/blog/airflow-1.10.10/#allow-passing-dagrun-conf-when-triggering-dags-via-ui>
> > as
> > > > better form similar to what we currently have at Twitter (see
> > > > DagConstructors here
> > > > <
> > https://blog.twitter.com/engineering/en_us/topics/insights/2018/ml-workflows.html>
> > or
> > > > image attached)
> > > >
> > > > Just wanted to drop this here to get people thoughts!
> > > >
> > > > The idea is heavily inspired by Kubeflow PipelinesParams + pipeline
> > > > decorator.
> > > >
> > > > Gerard Casas Saez
> > > > Twitter | Cortex | @casassaez <https://twitter.com/casassaez>
> > > >
> >

Reply via email to