ziyangRen commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3035648236

   @sjyangkevin I think the problem lies in that after the dagrun object is 
generated, the dag attributes are not updated at the appropriate time. Should 
we add a dag update in the task_instance_scheduling_decisions method, just like 
the code below?(dagrun.py 970)
   
   ```
   def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]:
       """Populate ``ti.task`` while excluding those missing one, marking them 
as REMOVED."""
       for ti in tis:
           try:
               ti.task = dag.get_task(ti.task_id)
           except TaskNotFound:
               # 尝试通过DagBag更新dag后再重试
               try:
                   from airflow.models.dagbag import DagBag
                   self.dag = DagBag().get_dag(self.dag_id)
                   ti.task = self.dag.get_task(ti.task_id)
               except TaskNotFound:
                   if ti.state != TaskInstanceState.REMOVED:
                       self.log.error("Failed to get task for ti %s. Marking it 
as removed.", ti)
                       ti.state = TaskInstanceState.REMOVED
                       session.flush()
               else:
                   yield ti
           else:
               yield ti
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to