Repository: incubator-airflow
Updated Branches:
  refs/heads/master 38cbf132a -> 7af20fe45


[AIRFLOW-1024] Ignore celery executor errors (#49)

Code defensively around the interactions with
celery so that
we just log errors instead of crashing the
scheduler.
It might makes sense to make the try catches one
level higher
(to catch errors from all executors), but this
needs some investigation.

Closes #2355 from aoen/ddavydov--
handle_celery_executor_errors_gracefully


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7af20fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7af20fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7af20fe4

Branch: refs/heads/master
Commit: 7af20fe452bad36767c43720168b5a34ce69eb0a
Parents: 38cbf13
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Thu Jun 8 17:10:35 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Thu Jun 8 17:10:38 2017 -0700

----------------------------------------------------------------------
 airflow/executors/celery_executor.py | 39 +++++++++++++++++--------------
 1 file changed, 22 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7af20fe4/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index 0b6cd59..d7f74c6 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -17,6 +17,7 @@ import logging
 import subprocess
 import ssl
 import time
+import traceback
 
 from celery import Celery
 from celery import states as celery_states
@@ -101,23 +102,27 @@ class CeleryExecutor(BaseExecutor):
         self.logger.debug(
             "Inquiring about {} celery task(s)".format(len(self.tasks)))
         for key, async in list(self.tasks.items()):
-            state = async.state
-            if self.last_state[key] != state:
-                if state == celery_states.SUCCESS:
-                    self.success(key)
-                    del self.tasks[key]
-                    del self.last_state[key]
-                elif state == celery_states.FAILURE:
-                    self.fail(key)
-                    del self.tasks[key]
-                    del self.last_state[key]
-                elif state == celery_states.REVOKED:
-                    self.fail(key)
-                    del self.tasks[key]
-                    del self.last_state[key]
-                else:
-                    self.logger.info("Unexpected state: " + async.state)
-                self.last_state[key] = async.state
+            try:
+                state = async.state
+                if self.last_state[key] != state:
+                    if state == celery_states.SUCCESS:
+                        self.success(key)
+                        del self.tasks[key]
+                        del self.last_state[key]
+                    elif state == celery_states.FAILURE:
+                        self.fail(key)
+                        del self.tasks[key]
+                        del self.last_state[key]
+                    elif state == celery_states.REVOKED:
+                        self.fail(key)
+                        del self.tasks[key]
+                        del self.last_state[key]
+                    else:
+                        self.logger.info("Unexpected state: " + async.state)
+                    self.last_state[key] = async.state
+            except Exception as e:
+                logging.error("Error syncing the celery executor, ignoring "
+                              "it:\n{}\n".format(e, traceback.format_exc()))
 
     def end(self, synchronous=False):
         if synchronous:

Reply via email to