turbaszek opened a new issue #16283:
URL: https://github.com/apache/airflow/issues/16283


   **Description**
   
   Make XComArg JSON-serializable so user can use `task_foo.output` in 
templated fields.
   
   **Use case / motivation**
   
   Let's consider this example from this [stackoverflow 
issue](https://stackoverflow.com/questions/67856466/airflow-pass-xcom-pull-result-to-triggerdagrunoperator-conf/67857131)
   
   ```py
   import airflow
   from airflow.operators.python import PythonOperator
   from airflow.operators.trigger_dagrun import TriggerDagRunOperator
   from airflow.utils.dates import days_ago
   
   default_args = {
       "depends_on_past": False,
       "start_date": days_ago(1)
   }
   
   
   def run_this_func(**context):
       print(context['params'])
   
   
   with airflow.DAG("bar", default_args=default_args, schedule_interval=None) 
as dag:
       run_this = PythonOperator(
           task_id="run_this",
           python_callable=run_this_func,
           provide_context=True)
   
   
   with airflow.DAG("foo", default_args=default_args, schedule_interval=None) 
as dag2:
       def _should_trigger(**_):
           return {'Message': 'Hello World'}
   
   
       task_a = PythonOperator(
           task_id="task_a",
           python_callable=run_this_func,
           provide_context=True,
       )
   
       trigger_bar_dag = TriggerDagRunOperator(
           task_id="trigger_bar_dag",
           trigger_dag_id="bar",
           conf={"payload": "{{ task_instance.xcom_pull('task_a') }}"},
       )
   
       task_a >> trigger_bar_dag
   ```
   
   This example works as expected however, with the new taskflow API I would 
like to use `task_a.output` attribute instead of writing jinja by hand:
   ```py
   conf={"payload": task_a.output},
   ```
   
   Unfortunately this will result and error:
   ```
   Broken DAG: [/files/dags/tasks.py] Traceback (most recent call last):
     File "/usr/local/lib/python3.7/json/encoder.py", line 257, in iterencode
       return _iterencode(o, 0)
     File "/usr/local/lib/python3.7/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type XComArg is not JSON serializable
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/baseoperator.py", line 181, in 
apply_defaults
       result = func(self, *args, **kwargs)
     File "/opt/airflow/airflow/operators/trigger_dagrun.py", line 114, in 
__init__
       raise AirflowException("conf parameter should be JSON Serializable")
   airflow.exceptions.AirflowException: conf parameter should be JSON 
Serializable
   ```
   
   Of course those who knows that `output` is an `XComArg` may also know that 
it implements string repr so one can do:
   ```py
   conf={"payload": str(task_a.output)},
   ```
   
   While it fixes the serialisation error it does not solve the issue from SO 
question. If we use `str(task_a.output)` then Airflow will not resolve 
correctly the dependency between `trigger_bar_dag` and `task_a` which may 
result in `payload` being `None` (as stated in SO issue).
   
   So what I would like to propose is to make `XComArg` JSON-serializable so 
default JSON backend can work with it.
   
   **Are you willing to submit a PR?**
   
   Yes 🚀 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to