qcha41 opened a new issue, #27079:
URL: https://github.com/apache/airflow/issues/27079

   ### Description
   
   In order to create an XCom value with a BashOperator or a DockerOperator, we 
can use the option `do_xcom_push` that pushes to XCom the last line of the 
command logs.
   
   It would be interesting to provide an option `parse_json` to deserialize 
this last log line in case it's a JSON string, before sending it as XCom. This 
would allow to access its attributes later in other tasks with the 
`xcom_pull()` method. 
   
   ### Use case/motivation
   
   See my StackOverflow post : 
https://stackoverflow.com/questions/74083466/how-to-deserialize-xcom-strings-in-airflow
   
   Consider a DAG containing two tasks: `DAG: Task A >> Task B` (BashOperators 
or DockerOperators). They need to communicate through XComs.
   
   - `Task A` outputs the informations through a one-line json in stdout, which 
can then be retrieve in the logs of `Task A`, and so in its *return_value* XCom 
key if `xcom_push=True`. For instance : `{"key1":1,"key2":3}`
   
   - `Task B` only needs the `key2` information from `Task A`, so we need to 
deserialize the *return_value* XCom of `Task A` to extract only this value and 
pass it directly to `Task B`, using the jinja template 
`{{xcom_pull('task_a')['key2']}}`. Using it as this results in 
`jinja2.exceptions.UndefinedError: 'str object' has no attribute 'key2'` 
because *return_value* is just a string.
   
   For example we can deserialize Airflow Variables in jinja templates (ex: `{{ 
var.json.my_var.path }}`). Globally I would like to do the same thing with 
XComs.
   
   **Current workaround**: 
   
   We can create a custom Operator (inherited from BashOperator or 
DockerOperator) and augment the `execute` method:
   
   1. execute the original `execute` method
   2. intercepts the last log line of the task
   3. tries to `json.loads()` it in a Python dictionnary 
   4. finally return the output (which is now a dictionnary, not a string)
   
   The previous jinja template `{{ xcom_pull('task_a')['key2'] }}` is now 
working in `task B`, since the XCom value is now a Python dictionnary.
   
   ```python
   class BashOperatorExtended(BashOperator):
       def execute(self, context):
           output = BashOperator.execute(self, context)
           try: 
               output = json.loads(output)
           except:
               pass
           return output
   
   class DockerOperatorExtended(DockerOperator):
       def execute(self, context):
           output = DockerOperator.execute(self, context)
           try: 
               output = json.loads(output)
           except:
               pass
           return output
   ```
   
   But creating a new operator just for that purpose is not really satisfying..
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to