grjones commented on code in PR #32000: URL: https://github.com/apache/airflow/pull/32000#discussion_r1282582823
########## airflow/models/dag.py: ########## @@ -2738,22 +2738,48 @@ def add_logger_if_needed(ti: TaskInstance): ) tasks = self.task_dict + + task_try_numbers: dict[tuple[str, int], int] = collections.defaultdict(int) + self.log.debug("starting dagrun") # Instead of starting a scheduler, we run the minimal loop possible to check # for task readiness and dependency management. This is notably faster # than creating a BackfillJob and allows us to surface logs to the user while dr.state == DagRunState.RUNNING: schedulable_tis, _ = dr.update_state(session=session) - try: - for ti in schedulable_tis: + + for ti in schedulable_tis: + try: add_logger_if_needed(ti) ti.task = tasks[ti.task_id] _run_task(ti, session=session) - except Exception: - self.log.info( - "Task failed. DAG will continue to run until finished and be marked as failed.", - exc_info=True, - ) + except Exception: + if ti.state == TaskInstanceState.UP_FOR_RETRY: + try_number = task_try_numbers[ti.task_id, ti.map_index] + if try_number > ti.max_tries: + ti.set_state(TaskInstanceState.FAILED) + else: + task_try_numbers[ti.task_id, ti.map_index] = try_number + 1 + self.log.info( + "Task failed. DAG will continue to run until finished and be marked as failed.", + exc_info=True, + ) + for ti in dr.get_task_instances(session=session, state=TaskInstanceState.SCHEDULED): Review Comment: This won't handle task instances in the deferred state though. I may not fully understand what this PR is fixing, but to me it seems like any deferred operator/sensor hangs on `dag.test()`. These will have the `deferred` state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org