Repository: incubator-airflow Updated Branches: refs/heads/master 38cbf132a -> 7af20fe45
[AIRFLOW-1024] Ignore celery executor errors (#49) Code defensively around the interactions with celery so that we just log errors instead of crashing the scheduler. It might makes sense to make the try catches one level higher (to catch errors from all executors), but this needs some investigation. Closes #2355 from aoen/ddavydov-- handle_celery_executor_errors_gracefully Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7af20fe4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7af20fe4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7af20fe4 Branch: refs/heads/master Commit: 7af20fe452bad36767c43720168b5a34ce69eb0a Parents: 38cbf13 Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Thu Jun 8 17:10:35 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Thu Jun 8 17:10:38 2017 -0700 ---------------------------------------------------------------------- airflow/executors/celery_executor.py | 39 +++++++++++++++++-------------- 1 file changed, 22 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7af20fe4/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 0b6cd59..d7f74c6 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -17,6 +17,7 @@ import logging import subprocess import ssl import time +import traceback from celery import Celery from celery import states as celery_states @@ -101,23 +102,27 @@ class CeleryExecutor(BaseExecutor): self.logger.debug( "Inquiring about {} celery task(s)".format(len(self.tasks))) for key, async in list(self.tasks.items()): - state = async.state - if self.last_state[key] != state: - if state == celery_states.SUCCESS: - self.success(key) - del self.tasks[key] - del self.last_state[key] - elif state == celery_states.FAILURE: - self.fail(key) - del self.tasks[key] - del self.last_state[key] - elif state == celery_states.REVOKED: - self.fail(key) - del self.tasks[key] - del self.last_state[key] - else: - self.logger.info("Unexpected state: " + async.state) - self.last_state[key] = async.state + try: + state = async.state + if self.last_state[key] != state: + if state == celery_states.SUCCESS: + self.success(key) + del self.tasks[key] + del self.last_state[key] + elif state == celery_states.FAILURE: + self.fail(key) + del self.tasks[key] + del self.last_state[key] + elif state == celery_states.REVOKED: + self.fail(key) + del self.tasks[key] + del self.last_state[key] + else: + self.logger.info("Unexpected state: " + async.state) + self.last_state[key] = async.state + except Exception as e: + logging.error("Error syncing the celery executor, ignoring " + "it:\n{}\n".format(e, traceback.format_exc())) def end(self, synchronous=False): if synchronous: