lidalei commented on issue #14672: URL: https://github.com/apache/airflow/issues/14672#issuecomment-844485176
tl;dr: set [schedule_after_task_execution](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#schedule-after-task-execution) to false by either updating your airflow.cfg (recommended) or setting AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION to False. We had the same issue and with the help of sentry to look through the whole stack trace, I found out why. The buggy code block is [taskinstance.py#L1182-L1201](https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1182-L1201): ``` # Recording SUCCESS self.end_date = timezone.utcnow() self.log.info( 'Marking task as SUCCESS. ' 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', self.dag_id, self.task_id, self._date_or_empty('execution_date'), self._date_or_empty('start_date'), self._date_or_empty('end_date'), ) self.set_duration() if not test_mode: session.add(Log(self.state, self)) session.merge(self) session.commit() if not test_mode: self._run_mini_scheduler_on_child_tasks(session) ``` Looking at the code: after marking a task SUCCESS and **commit**, if it is not test mode, it will call a potentially expensive function _run_mini_scheduler_on_child_tasks. And [local_task_job.py#L179-L199](https://github.com/apache/airflow/blob/master/airflow/jobs/local_task_job.py#L179-L199) will detect the task SUCCESS very soon and since the task is not running, it will terminate the process which might be still executing _run_mini_scheduler_on_child_tasks: ``` if ti.state == State.RUNNING: ... elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'): self.log.warning( "State of this instance has been externally set to %s. " "Terminating instance.", ti.state ) self.task_runner.terminate() ``` This is proven by the log @saurabhladhe shared (the line numbers diverge a bit because the log was logged by Airflow 2.0.1): ``` [2021-03-08 21:46:32,051] {taskinstance.py:1166} INFO - Marking task as SUCCESS. dag_id=canary_dag, task_id=print_the_context, execution_date=20210308T201000, start_date=20210308T214431, end_date=20210308T214632 ... [2021-03-08 21:46:51,152] {local_task_job.py:188} WARNING - State of this instance has been externally set to success. Terminating instance. [2021-03-08 21:46:51,153] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 22 ``` So the mitigation is to make _run_mini_scheduler_on_child_tasks cheap, which is an optimization controlled by [schedule_after_task_execution](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#schedule-after-task-execution) and can be disabled. ``` def _run_mini_scheduler_on_child_tasks(self, session=None) -> None: if conf.getboolean('scheduler', 'schedule_after_task_execution', fallback=True): ``` The proper fix would start from changing ``` if not test_mode: session.add(Log(self.state, self)) session.merge(self) session.commit() if not test_mode: self._run_mini_scheduler_on_child_tasks(session) ``` to ``` if not test_mode: session.add(Log(self.state, self)) session.merge(self) self._run_mini_scheduler_on_child_tasks(session) session.commit() ``` However, _run_mini_scheduler_on_child_tasks might got an OperationalError and roll back the session completely [taskinstance.py#L1248](https://github.com/apache/airflow/blob/858f93cb79384c1034f3fa081bb0cc246582db94/airflow/models/taskinstance.py#L1248). And we will falsely mark the task failure, though it is only the optimization failure. So I'd leave it to someone who knows the code better to fix it properly. Personally I'd suggest to remove this optimization completely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
