James Meickle created AIRFLOW-3418:
--------------------------------------

             Summary: Task stuck in running state, unable to clear
                 Key: AIRFLOW-3418
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3418
             Project: Apache Airflow
          Issue Type: Bug
          Components: worker
    Affects Versions: 1.10.1
            Reporter: James Meickle


One of our tasks (a custom operator that sleep-waits until NYSE market close) 
got stuck in a "running" state in the metadata db without making any progress. 
This is what it looked like in the logs:

{{[2018-11-29 06:03:57,643] \{{models.py:1355}} INFO - Dependencies not met for 
<TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 
[running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' 
state which is not a valid state for execution. The task must be cleared in 
order to be run.}}
{{[2018-11-29 06:03:57,644] \{{models.py:1355}} INFO - Dependencies not met for 
<TaskInstance: reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 
[running]>, dependency 'Task Instance Not Already Running' FAILED: Task is 
already running, it started on 2018-11-29 00:01:10.876344+00:00.}}
{{[2018-11-29 06:03:57,646] \{{logging_mixin.py:95}} INFO - [2018-11-29 
06:03:57,646] \{{jobs.py:2614}} INFO - Task is not able to be run}}

Seeing this state, we attempted to "clear" it in the web UI. This yielded a 
complex backtrace:

{{Traceback (most recent call last):}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
line 1982, in wsgi_app}}
{{ response = self.full_dispatch_request()}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
line 1614, in full_dispatch_request}}
{{ rv = self.handle_user_exception(e)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
line 1517, in handle_user_exception}}
{{ reraise(exc_type, exc_value, tb)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/_compat.py",
 line 33, in reraise}}
{{ raise value}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
line 1612, in full_dispatch_request}}
{{ rv = self.dispatch_request()}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", 
line 1598, in dispatch_request}}
{{ return self.view_functions[rule.endpoint](**req.view_args)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask_appbuilder/security/decorators.py",
 line 26, in wraps}}
{{ return f(self, *args, **kwargs)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/decorators.py",
 line 55, in wrapper}}
{{ return f(*args, **kwargs)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/views.py",
 line 837, in clear}}
{{ include_upstream=upstream)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
 line 4011, in sub_dag}}
{{ dag = copy.deepcopy(self)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, in 
deepcopy}}
{{ y = copier(memo)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
 line 3996, in __deepcopy__}}
{{ setattr(result, k, copy.deepcopy(v, memo))}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in 
deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, in 
deepcopy}}
{{ y = copier(memo)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
 line 2740, in __deepcopy__}}
{{ setattr(result, k, copy.deepcopy(v, memo))}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in 
deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in 
_reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in 
deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in 
deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in 
_reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in 
deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in 
deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in 
_reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in 
deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in 
deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in 
deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in 
_reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in 
deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in 
deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 218, in 
_deepcopy_list}}
{{ y.append(deepcopy(a, memo))}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in 
deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in 
_reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in 
deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in 
_deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 174, in 
deepcopy}}
{{ rv = reductor(4)}}
{{TypeError: cannot serialize '_io.TextIOWrapper' object}}

After browsing through Airflow's code I had a suspicion that this was simply 
the "clear" code in the UI not handling some property on one of our operators. 
I instead used the Browse feature to edit the metadata state db directly. This 
did result in the status change; in the task being set to "up_for_retry", and 
the same logfile now having additional contents:


{{[2018-11-29 14:18:11,390] \{{logging_mixin.py:95}} INFO - [2018-11-29 
14:18:11,390] \{{jobs.py:2695}} WARNING - State of this instance has been 
externally set to failed. Taking the poison pill.}}
{{[2018-11-29 14:18:11,399] \{{helpers.py:240}} INFO - Sending Signals.SIGTERM 
to GPID 5287}}
{{[2018-11-29 14:18:11,399] \{{models.py:1636}} ERROR - Received SIGTERM. 
Terminating subprocesses.}}
{{[2018-11-29 14:18:11,418] \{{models.py:1760}} ERROR - Task received SIGTERM 
signal}}
{{Traceback (most recent call last):}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
 line 1654, in _run_raw_task}}
{{ result = task_copy.execute(context=context)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
 line 78, in execute}}
{{ sleep(self.poke_interval)}}
{{ File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
 line 1638, in signal_handler}}
{{ raise AirflowException("Task received SIGTERM signal")}}
{{airflow.exceptions.AirflowException: Task received SIGTERM signal}}
{{[2018-11-29 14:18:11,420] \{{models.py:1783}} INFO - Marking task as 
UP_FOR_RETRY}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close Traceback (most recent call last):}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close File "/home/airflow/virtualenvs/airflow/bin/airflow", line 
32, in <module>}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close args.func(args)}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/cli.py",
 line 74, in wrapper}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close return f(*args, **kwargs)}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
 line 490, in run}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close _run(args, dag, ti)}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
 line 406, in _run}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close pool=args.pool,}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/db.py",
 line 74, in wrapper}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close return func(*args, **kwargs)}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
 line 1654, in _run_raw_task}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close result = task_copy.execute(context=context)}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
 line 78, in execute}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close sleep(self.poke_interval)}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close File 
"/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
 line 1638, in signal_handler}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close raise AirflowException("Task received SIGTERM signal")}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: 
Subtask after_close airflow.exceptions.AirflowException: Task received SIGTERM 
signal}}
{{[2018-11-29 14:18:11,693] \{{helpers.py:230}} INFO - Process 
psutil.Process(pid=5287 (terminated)) (5287) terminated with exit code 1}}
{{[2018-11-29 14:18:11,694] \{{logging_mixin.py:95}} INFO - [2018-11-29 
14:18:11,693] \{{jobs.py:2627}} INFO - Task exited with return code 0}}

The log line about "not able to be run" comes from jobs.py and it's unclear to 
me why this would be called in this case (two workers grabbing the same 
message...?) or why the task would just hang in a "running" state: 
https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/jobs.py#L2614

We had not previously observed any of this behavior. We had just upgraded to 
1.10.1 earlier this week.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to