NicolasBearingPoint commented on issue #50491:
URL: https://github.com/apache/airflow/issues/50491#issuecomment-2873061883

   [UPDATE] 
   I think I found the reason: 
   
   - Airflow uses xcom_pull() in its internal dependency rules to determine if 
a task was previously "skipped" due to a branch.
   - This is part of the mechanism that prevents downstream tasks of a 
BranchPythonOperator from being executed if they were skipped by the branch.
   - This explains why the deserialization of the branch XCom occurs before 
your task is even scheduled or executed.
   - In other words: The scheduler reads the XCom generated by the branching 
task to determine whether the current task should be skipped, set to None, or 
scheduled.
   - As the deserialization of the XCom only depends on access to the data and 
the result stored in the Airflow database, if the XCom is properly stored and 
its value (even if it’s an S3 link) is present in the database, it will not 
prevent the scheduler or worker from understanding the task’s state and making 
the correct decisions.
   
   So in the `deserialize_value()` funciton I need to add an early exit to skip 
full deserialization if it running outside task context (eg: Scheduler after 
BranchingOperator): 
   
   ```
   # Early exit: skip full deserialization if running outside task context 
(Scheduler after BranchingOperator)
   if lazy or not bool(os.environ.get("AIRFLOW_CTX_DAG_ID")):
        # Return the raw pointer (ex: xcom_s3://...) for comparison or logging
        task_logger.warning("XCom deserialization attempted outside task 
context (probably in the scheduler), skipping download and deserialize: %s", 
result)
               return result
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to