NicolasBearingPoint commented on issue #50491:
URL: https://github.com/apache/airflow/issues/50491#issuecomment-2873061883
[UPDATE]
I think I found the reason:
- Airflow uses xcom_pull() in its internal dependency rules to determine if
a task was previously "skipped" due to a branch.
- This is part of the mechanism that prevents downstream tasks of a
BranchPythonOperator from being executed if they were skipped by the branch.
- This explains why the deserialization of the branch XCom occurs before
your task is even scheduled or executed.
- In other words: The scheduler reads the XCom generated by the branching
task to determine whether the current task should be skipped, set to None, or
scheduled.
- As the deserialization of the XCom only depends on access to the data and
the result stored in the Airflow database, if the XCom is properly stored and
its value (even if it’s an S3 link) is present in the database, it will not
prevent the scheduler or worker from understanding the task’s state and making
the correct decisions.
So in the `deserialize_value()` funciton I need to add an early exit to skip
full deserialization if it running outside task context (eg: Scheduler after
BranchingOperator):
```
# Early exit: skip full deserialization if running outside task context
(Scheduler after BranchingOperator)
if lazy or not bool(os.environ.get("AIRFLOW_CTX_DAG_ID")):
# Return the raw pointer (ex: xcom_s3://...) for comparison or logging
task_logger.warning("XCom deserialization attempted outside task
context (probably in the scheduler), skipping download and deserialize: %s",
result)
return result
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]