Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test 90f05cf33 -> acfc72e01


[AIRFLOW-2650] Mark SchedulerJob as succeed when hitting Ctrl-c

Without this fix it turns out that the job would
remain in the running
state.

This also sets things to failed in case of any
other exception.

Closes #3525 from ashb/scheduler-job-status

(cherry picked from commit b0061f1369636c86df87829b93d5ece582591e2a)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/acfc72e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/acfc72e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/acfc72e0

Branch: refs/heads/v1-10-test
Commit: acfc72e01a5dd24a86513ab93e33b8856ad0e284
Parents: 90f05cf
Author: Ash Berlin-Taylor <ash_git...@firemirror.com>
Authored: Wed Jun 27 22:34:16 2018 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 27 22:34:36 2018 +0200

----------------------------------------------------------------------
 airflow/jobs.py | 22 ++++++++++++++--------
 tests/jobs.py   | 42 +++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 55 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/acfc72e0/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ad114ab..5143e8e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -197,14 +197,20 @@ class BaseJob(Base, LoggingMixin):
             make_transient(self)
             self.id = id_
 
-            # Run
-            self._execute()
-
-            # Marking the success in the DB
-            self.end_date = timezone.utcnow()
-            self.state = State.SUCCESS
-            session.merge(self)
-            session.commit()
+            try:
+                self._execute()
+                # In case of max runs or max duration
+                self.state = State.SUCCESS
+            except SystemExit as e:
+                # In case of ^C or SIGTERM
+                self.state = State.SUCCESS
+            except Exception as e:
+                self.state = State.FAILED
+                raise
+            finally:
+                self.end_date = timezone.utcnow()
+                session.merge(self)
+                session.commit()
 
         Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/acfc72e0/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index f534b65..5dd6ff3 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -39,7 +39,7 @@ import sqlalchemy
 from airflow import AirflowException, settings, models
 from airflow.bin import cli
 from airflow.executors import BaseExecutor, SequentialExecutor
-from airflow.jobs import BackfillJob, SchedulerJob, LocalTaskJob
+from airflow.jobs import BaseJob, BackfillJob, SchedulerJob, LocalTaskJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance 
as TI
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
@@ -86,6 +86,46 @@ TEST_DAGS_FOLDER = os.path.join(
     os.path.dirname(os.path.realpath(__file__)), 'dags')
 
 
+class BaseJobTest(unittest.TestCase):
+    class TestJob(BaseJob):
+        __mapper_args__ = {
+            'polymorphic_identity': 'TestJob'
+        }
+
+        def __init__(self, cb):
+            self.cb = cb
+            super(BaseJobTest.TestJob, self).__init__()
+
+        def _execute(self):
+            return self.cb()
+
+    def test_state_success(self):
+        job = self.TestJob(lambda: True)
+        job.run()
+
+        self.assertEquals(job.state, State.SUCCESS)
+        self.assertIsNotNone(job.end_date)
+
+    def test_state_sysexit(self):
+        import sys
+        job = self.TestJob(lambda: sys.exit(0))
+        job.run()
+
+        self.assertEquals(job.state, State.SUCCESS)
+        self.assertIsNotNone(job.end_date)
+
+    def test_state_failed(self):
+        def abort():
+            raise RuntimeError("fail")
+
+        job = self.TestJob(abort)
+        with self.assertRaises(RuntimeError):
+            job.run()
+
+        self.assertEquals(job.state, State.FAILED)
+        self.assertIsNotNone(job.end_date)
+
+
 class BackfillJobTest(unittest.TestCase):
 
     def setUp(self):

Reply via email to