ziyangRen commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3047901430

   @sjyangkevin After testing, the following code can solve the problem. Next 
week, could you please help verify if this is a reasonable modification?
   ```
   def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]:
       """Populate ``ti.task`` while excluding those missing one, marking them 
as REMOVED."""
       for ti in tis:
           try:
               ti.task = dag.get_task(ti.task_id)
           except TaskNotFound:
               try:
                   self.log.debug("tasks not found appeared, dag_id= %s, 
task_id=%s"  % (self.dag_id, ti.task_id))
                   from airflow.models.dagbag import DagBag
                   latest_dag = 
DagBag(read_dags_from_db=True).get_dag(self.dag_id, session=session)
                   ti.task = latest_dag.get_task(ti.task_id)
                   yield ti
                   self.log.debug("Task %s found in reloaded DAG." % ti)
               except TaskNotFound:
                   if ti.state != TaskInstanceState.REMOVED:
                       self.log.error("Failed to get task for ti %s. Marking it 
as removed.", ti)
                       ti.state = TaskInstanceState.REMOVED
                       session.flush()
           else:
               yield ti
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to