turbaszek opened a new issue #16283: URL: https://github.com/apache/airflow/issues/16283
**Description** Make XComArg JSON-serializable so user can use `task_foo.output` in templated fields. **Use case / motivation** Let's consider this example from this [stackoverflow issue](https://stackoverflow.com/questions/67856466/airflow-pass-xcom-pull-result-to-triggerdagrunoperator-conf/67857131) ```py import airflow from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.dates import days_ago default_args = { "depends_on_past": False, "start_date": days_ago(1) } def run_this_func(**context): print(context['params']) with airflow.DAG("bar", default_args=default_args, schedule_interval=None) as dag: run_this = PythonOperator( task_id="run_this", python_callable=run_this_func, provide_context=True) with airflow.DAG("foo", default_args=default_args, schedule_interval=None) as dag2: def _should_trigger(**_): return {'Message': 'Hello World'} task_a = PythonOperator( task_id="task_a", python_callable=run_this_func, provide_context=True, ) trigger_bar_dag = TriggerDagRunOperator( task_id="trigger_bar_dag", trigger_dag_id="bar", conf={"payload": "{{ task_instance.xcom_pull('task_a') }}"}, ) task_a >> trigger_bar_dag ``` This example works as expected however, with the new taskflow API I would like to use `task_a.output` attribute instead of writing jinja by hand: ```py conf={"payload": task_a.output}, ``` Unfortunately this will result and error: ``` Broken DAG: [/files/dags/tasks.py] Traceback (most recent call last): File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode return _iterencode(o, 0) File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.__class__.__name__} ' TypeError: Object of type XComArg is not JSON serializable During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/airflow/airflow/models/baseoperator.py", line 181, in apply_defaults result = func(self, *args, **kwargs) File "/opt/airflow/airflow/operators/trigger_dagrun.py", line 114, in __init__ raise AirflowException("conf parameter should be JSON Serializable") airflow.exceptions.AirflowException: conf parameter should be JSON Serializable ``` Of course those who knows that `output` is an `XComArg` may also know that it implements string repr so one can do: ```py conf={"payload": str(task_a.output)}, ``` While it fixes the serialisation error it does not solve the issue from SO question. If we use `str(task_a.output)` then Airflow will not resolve correctly the dependency between `trigger_bar_dag` and `task_a` which may result in `payload` being `None` (as stated in SO issue). So what I would like to propose is to make `XComArg` JSON-serializable so default JSON backend can work with it. **Are you willing to submit a PR?** Yes 🚀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
