qcha41 opened a new issue, #27079: URL: https://github.com/apache/airflow/issues/27079
### Description In order to create an XCom value with a BashOperator or a DockerOperator, we can use the option `do_xcom_push` that pushes to XCom the last line of the command logs. It would be interesting to provide an option `parse_json` to deserialize this last log line in case it's a JSON string, before sending it as XCom. This would allow to access its attributes later in other tasks with the `xcom_pull()` method. ### Use case/motivation See my StackOverflow post : https://stackoverflow.com/questions/74083466/how-to-deserialize-xcom-strings-in-airflow Consider a DAG containing two tasks: `DAG: Task A >> Task B` (BashOperators or DockerOperators). They need to communicate through XComs. - `Task A` outputs the informations through a one-line json in stdout, which can then be retrieve in the logs of `Task A`, and so in its *return_value* XCom key if `xcom_push=True`. For instance : `{"key1":1,"key2":3}` - `Task B` only needs the `key2` information from `Task A`, so we need to deserialize the *return_value* XCom of `Task A` to extract only this value and pass it directly to `Task B`, using the jinja template `{{xcom_pull('task_a')['key2']}}`. Using it as this results in `jinja2.exceptions.UndefinedError: 'str object' has no attribute 'key2'` because *return_value* is just a string. For example we can deserialize Airflow Variables in jinja templates (ex: `{{ var.json.my_var.path }}`). Globally I would like to do the same thing with XComs. **Current workaround**: We can create a custom Operator (inherited from BashOperator or DockerOperator) and augment the `execute` method: 1. execute the original `execute` method 2. intercepts the last log line of the task 3. tries to `json.loads()` it in a Python dictionnary 4. finally return the output (which is now a dictionnary, not a string) The previous jinja template `{{ xcom_pull('task_a')['key2'] }}` is now working in `task B`, since the XCom value is now a Python dictionnary. ```python class BashOperatorExtended(BashOperator): def execute(self, context): output = BashOperator.execute(self, context) try: output = json.loads(output) except: pass return output class DockerOperatorExtended(DockerOperator): def execute(self, context): output = DockerOperator.execute(self, context) try: output = json.loads(output) except: pass return output ``` But creating a new operator just for that purpose is not really satisfying.. ### Related issues _No response_ ### Are you willing to submit a PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
