ziyangRen commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3035648236
@sjyangkevin I think the problem lies in that after the dagrun object is
generated, the dag attributes are not updated at the appropriate time. Should
we add a dag update in the task_instance_scheduling_decisions method, just like
the code below?(dagrun.py 970)
```
def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]:
"""Populate ``ti.task`` while excluding those missing one, marking them
as REMOVED."""
for ti in tis:
try:
ti.task = dag.get_task(ti.task_id)
except TaskNotFound:
# 尝试通过DagBag更新dag后再重试
try:
from airflow.models.dagbag import DagBag
self.dag = DagBag().get_dag(self.dag_id)
ti.task = self.dag.get_task(ti.task_id)
except TaskNotFound:
if ti.state != TaskInstanceState.REMOVED:
self.log.error("Failed to get task for ti %s. Marking it
as removed.", ti)
ti.state = TaskInstanceState.REMOVED
session.flush()
else:
yield ti
else:
yield ti
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]