hussein-awala commented on issue #8792:
URL: https://github.com/apache/airflow/issues/8792#issuecomment-1915863325

   As a workaround, we can use the new callbacks class:
   ```python
   class XComCallbacks(KubernetesPodOperatorCallback):
       @staticmethod
       def on_pod_completion(*, pod: k8s.V1Pod, client: CoreV1Api, mode: str, 
**kwargs) -> None:
           from airflow.models.xcom import XCom
   
           def _construct_run_id(run_id: str):
               """re-construct the run_id from the safe label"""
               new_run_id = run_id.split("T")[0]
               rest = run_id.split("T")[1]
               new_run_id += "T"
               new_run_id += rest[:2] + ":" + rest[2:4] + ":" + rest[4:6]
               new_run_id += rest[6:13] + "+" + rest[13:15] + ":" + rest[15:17]
               return new_run_id
   
           if (pod.status.phase if hasattr(pod, "status") else None) != 
PodPhase.SUCCEEDED:
               pod_manager = PodManager(kube_client=client)
               pod_manager.await_xcom_sidecar_container_start(pod=pod)
               result = pod_manager.extract_xcom(pod=pod)
               XCom.set(
                   key="failure_result",
                   value=result,
                   task_id=pod.metadata.labels["task_id"],
                   dag_id=pod.metadata.labels["dag_id"],
                   run_id=_construct_run_id(pod.metadata.labels["run_id"]),
               ) 
   ```
   And for deferrable mode, we can implement the same logic in 
`on_operator_resuming`.
   
   Happy to find a new use case for this feature 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to