+1 for the idea Tomek
On Tue, Jun 16, 2020 at 1:39 AM Kaxil Naik <kaxiln...@gmail.com> wrote: > Oh yes that sounds good, +1 to the idea as long as it can return a JSON > serializable object I am fine with it. > > On Tue, Jun 16, 2020 at 12:29 AM Gerard Casas Saez > <gcasass...@twitter.com.invalid> wrote: > > > By XCom support before XComArg I meant as XCom parameters for operators. > > You needed to use {{contex[‘ti’].xcom_pull(…)}} instead of using XComArg > > objects like you can do as latest master. > > > > Gerard Casas Saez > > Twitter | Cortex | @casassaez > > On Jun 15, 2020, 5:02 PM -0600, Kaxil Naik <kaxiln...@gmail.com>, wrote: > > > Isn't it already possible using params ( > > > > > > https://github.com/apache/airflow/blob/master/airflow/models/dag.py#L138-L141 > > > )? > > > > > > Sample Usage: > > > > > > https://gist.github.com/kaxil/335d90da8821a4e515046ff0f470fc97#file-airflow_params_usage_2-py > > > > > > Currently, we allowing passing params in the DAG and overriding the > > params > > > using dagrun_conf via CLI or UI: > > > > > > Code: > > > > > > - > > > > > > https://github.com/apache/airflow/blob/3de68501b7a76dce24bfd8a8b4659eedcf7ac29c/airflow/models/taskinstance.py#L1335-L1336 > > > - > > > > > > https://github.com/apache/airflow/blob/3de68501b7a76dce24bfd8a8b4659eedcf7ac29c/airflow/models/taskinstance.py#L1454-L1456 > > > > > > > > > Or am I missing something? > > > > > > Regards, > > > Kaxil > > > > > > On Mon, Jun 15, 2020 at 11:48 PM Gerard Casas Saez > > > <gcasass...@twitter.com.invalid> wrote: > > > > > > > I do not think we should support RunTimeParams to modify the topology > > (at > > > > least at the beginning). > > > > > > > > Modify the topology involves quite a bit more of deeper changes. Even > > > > though it may be useful, I believe the value/time tradeoff, is high, > so > > > > focusing on enabling parametrization on fixed topology is definitely > an > > > > easier step to focus on and will probs bring enough value. > > > > > > > > Curious what are other people thoughts on this? > > > > > > > > Gerard Casas Saez > > > > Twitter | Cortex | @casassaez > > > > On Jun 12, 2020, 10:00 AM -0600, Dan Davydov > > <ddavy...@twitter.com.invalid>, > > > > wrote: > > > > > I think this is a great idea! One thing that I think we should > > figure out > > > > > before implementing is how to do so alongside DAG serialization, > i.e. > > > > > letting these params modify DAG topology might make it hard to > store > > > > > serialized representations for the Airflow services to consume and > > > > render, > > > > > though that may be more of a statement about the dagrun > > configuration and > > > > > orthogonal to the change proposed here. > > > > > > > > > > On Thu, Jun 11, 2020 at 7:58 PM Gerard Casas Saez > > > > > <gcasass...@twitter.com.invalid> wrote: > > > > > > > > > > > As we wrap the work on AIP-31 (functional definition), I wanted > to > > > > bring > > > > > > another idea here for discussion. > > > > > > > > > > > > The concept is to parametrize pipelines using a similar class > than > > > > XComArg > > > > > > that we introduced recently. As of 1.10.10, we can use the UI to > > set > > > > the > > > > > > DagRun configuration on the trigger DAG view using a json blob. > > > > > > > > > > > > Accessing those is still hard (you need to pull DagRun from > current > > > > > > context and then access the conf object). My proposal would be to > > add > > > > a new > > > > > > class that is resolved on execution similar to how we resolve > > XComArgs. > > > > > > > > > > > > class DAGParam(key:str, defaul:Any, type:type): > > > > > > > > > > > > > > > > > > def resolve(dag_run: DagRun): > > > > > > > > > > > > return dag_run.conf[self.key] > > > > > > > > > > > > > > > > > > # Raw usage: > > > > > > > > > > > > > > > > > > with DAG(...) as dag: > > > > > > > > > > > > param = DAGParam(key='number', default=3, type=int) > > > > > > > > > > > > SomeOperator(num=param) > > > > > > > > > > > > > > > > > > # From DAG object > > > > > > > > > > > > > > > > > > with DAG(...) as dag: > > > > > > > > > > > > SomeOperator(num=dag.param(key='number', default=3, type=int)) > > > > > > > > > > > > > > > > > > # Decorator approach: > > > > > > > > > > > > > > > > > > @dag(...) > > > > > > > > > > > > def my_dag(number:int=3): > > > > > > > > > > > > SomeOperator(num=number) > > > > > > > > > > > > > > > > > > Gist: > > https://gist.github.com/casassg/aa29b4d5d7f07f16630e591e351e570a > > > > > > > > > > > > This would allow us to discover this params and surface them in > the > > > > Trigger > > > > > > DAG UI > > > > > > <https://%20 > > > > > > > https://airflow.apache.org/blog/airflow-1.10.10/#allow-passing-dagrun-conf-when-triggering-dags-via-ui > > > > > > > as > > > > > > better form similar to what we currently have at Twitter (see > > > > > > DagConstructors here > > > > > > < > > > > > > > https://blog.twitter.com/engineering/en_us/topics/insights/2018/ml-workflows.html > > > > > > > or > > > > > > image attached) > > > > > > > > > > > > Just wanted to drop this here to get people thoughts! > > > > > > > > > > > > The idea is heavily inspired by Kubeflow PipelinesParams + > pipeline > > > > > > decorator. > > > > > > > > > > > > Gerard Casas Saez > > > > > > Twitter | Cortex | @casassaez <https://twitter.com/casassaez> > > > > > > > > > > > > >