lidalei commented on issue #14672:
URL: https://github.com/apache/airflow/issues/14672#issuecomment-844485176


   tl;dr: set 
[schedule_after_task_execution](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#schedule-after-task-execution)
 to false by either updating your airflow.cfg (recommended) or setting 
AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION to False.
   
   
   We had the same issue and with the help of sentry to look through the whole 
stack trace, I found out why.
   
   The buggy code block is 
[taskinstance.py#L1182-L1201](https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1182-L1201):
   
   ```
           # Recording SUCCESS
           self.end_date = timezone.utcnow()
           self.log.info(
               'Marking task as SUCCESS. '
               'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, 
end_date=%s',
               self.dag_id,
               self.task_id,
               self._date_or_empty('execution_date'),
               self._date_or_empty('start_date'),
               self._date_or_empty('end_date'),
           )
           self.set_duration()
           if not test_mode:
               session.add(Log(self.state, self))
               session.merge(self)
   
           session.commit()
   
           if not test_mode:
               self._run_mini_scheduler_on_child_tasks(session)
   ```
   
   Looking at the code: after marking a task SUCCESS and **commit**, if it is 
not test mode, it will call a potentially expensive function 
_run_mini_scheduler_on_child_tasks. And 
[local_task_job.py#L179-L199](https://github.com/apache/airflow/blob/master/airflow/jobs/local_task_job.py#L179-L199)
 will detect the task SUCCESS very soon and since the task is not running, it 
will terminate the process which might be still executing 
_run_mini_scheduler_on_child_tasks:
   
   ```
           if ti.state == State.RUNNING:
               ...
           elif self.task_runner.return_code() is None and 
hasattr(self.task_runner, 'process'):
               self.log.warning(
                   "State of this instance has been externally set to %s. " 
"Terminating instance.", ti.state
               )
               self.task_runner.terminate()
   ```
   
   This is proven by the log @saurabhladhe shared (the line numbers diverge a 
bit because the log was logged by Airflow 2.0.1):
   ```
   [2021-03-08 21:46:32,051] {taskinstance.py:1166} INFO - Marking task as 
SUCCESS. dag_id=canary_dag, task_id=print_the_context, 
execution_date=20210308T201000, start_date=20210308T214431, 
end_date=20210308T214632
   
   ...
   
   [2021-03-08 21:46:51,152] {local_task_job.py:188} WARNING - State of this 
instance has been externally set to success. Terminating instance.
   
   [2021-03-08 21:46:51,153] {process_utils.py:100} INFO - Sending 
Signals.SIGTERM to GPID 22
   ```
   
   So the mitigation is to make _run_mini_scheduler_on_child_tasks cheap, which 
is an optimization controlled by 
[schedule_after_task_execution](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#schedule-after-task-execution)
 and can be disabled.
   
   ```
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           if conf.getboolean('scheduler', 'schedule_after_task_execution', 
fallback=True):
   ```
   
   The proper fix would start from changing
   ```
           if not test_mode:
               session.add(Log(self.state, self))
               session.merge(self)
   
           session.commit()
   
           if not test_mode:
               self._run_mini_scheduler_on_child_tasks(session)
   ```
   
   to 
   
   ```
           if not test_mode:
               session.add(Log(self.state, self))
               session.merge(self)
               self._run_mini_scheduler_on_child_tasks(session)
   
           session.commit()
   ```
   
   However, _run_mini_scheduler_on_child_tasks might got an OperationalError 
and roll back the session completely 
[taskinstance.py#L1248](https://github.com/apache/airflow/blob/858f93cb79384c1034f3fa081bb0cc246582db94/airflow/models/taskinstance.py#L1248).
 And we will falsely mark the task failure, though it is only the optimization 
failure. So I'd leave it to someone who knows the code better to fix it 
properly. Personally I'd suggest to remove this optimization completely.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to