This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c6313e48a2ea53836b2d6619741534443f08f9aa Author: Jorrick Sleijster <[email protected]> AuthorDate: Tue Jun 22 10:08:00 2021 +0200 Fix Orphaned tasks stuck in CeleryExecutor as running (#16550) (cherry picked from commit 90f0088c5752b56177597725cc716f707f2f8456) --- airflow/executors/celery_executor.py | 4 +--- tests/executors/test_celery_executor.py | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 553639b..567fe58 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -369,9 +369,7 @@ class CeleryExecutor(BaseExecutor): "\n\t".join([repr(x) for x in timedout_keys]), ) for key in timedout_keys: - self.event_buffer[key] = (State.FAILED, None) - del self.tasks[key] - del self.adopted_task_timeouts[key] + self.change_state(key, State.FAILED) def debug_dump(self) -> None: """Called in response to SIGUSR2 by the scheduler""" diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 19c8a0d..d15ca9a 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -371,10 +371,12 @@ class TestCeleryExecutor(unittest.TestCase): key_1: queued_dttm + executor.task_adoption_timeout, key_2: queued_dttm + executor.task_adoption_timeout, } + executor.running = {key_1, key_2} executor.tasks = {key_1: AsyncResult("231"), key_2: AsyncResult("232")} executor.sync() assert executor.event_buffer == {key_1: (State.FAILED, None), key_2: (State.FAILED, None)} assert executor.tasks == {} + assert executor.running == set() assert executor.adopted_task_timeouts == {}
