Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test 90f05cf33 -> acfc72e01
[AIRFLOW-2650] Mark SchedulerJob as succeed when hitting Ctrl-c Without this fix it turns out that the job would remain in the running state. This also sets things to failed in case of any other exception. Closes #3525 from ashb/scheduler-job-status (cherry picked from commit b0061f1369636c86df87829b93d5ece582591e2a) Signed-off-by: Bolke de Bruin <bo...@xs4all.nl> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/acfc72e0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/acfc72e0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/acfc72e0 Branch: refs/heads/v1-10-test Commit: acfc72e01a5dd24a86513ab93e33b8856ad0e284 Parents: 90f05cf Author: Ash Berlin-Taylor <ash_git...@firemirror.com> Authored: Wed Jun 27 22:34:16 2018 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Wed Jun 27 22:34:36 2018 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 22 ++++++++++++++-------- tests/jobs.py | 42 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/acfc72e0/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index ad114ab..5143e8e 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -197,14 +197,20 @@ class BaseJob(Base, LoggingMixin): make_transient(self) self.id = id_ - # Run - self._execute() - - # Marking the success in the DB - self.end_date = timezone.utcnow() - self.state = State.SUCCESS - session.merge(self) - session.commit() + try: + self._execute() + # In case of max runs or max duration + self.state = State.SUCCESS + except SystemExit as e: + # In case of ^C or SIGTERM + self.state = State.SUCCESS + except Exception as e: + self.state = State.FAILED + raise + finally: + self.end_date = timezone.utcnow() + session.merge(self) + session.commit() Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/acfc72e0/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index f534b65..5dd6ff3 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -39,7 +39,7 @@ import sqlalchemy from airflow import AirflowException, settings, models from airflow.bin import cli from airflow.executors import BaseExecutor, SequentialExecutor -from airflow.jobs import BackfillJob, SchedulerJob, LocalTaskJob +from airflow.jobs import BaseJob, BackfillJob, SchedulerJob, LocalTaskJob from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator @@ -86,6 +86,46 @@ TEST_DAGS_FOLDER = os.path.join( os.path.dirname(os.path.realpath(__file__)), 'dags') +class BaseJobTest(unittest.TestCase): + class TestJob(BaseJob): + __mapper_args__ = { + 'polymorphic_identity': 'TestJob' + } + + def __init__(self, cb): + self.cb = cb + super(BaseJobTest.TestJob, self).__init__() + + def _execute(self): + return self.cb() + + def test_state_success(self): + job = self.TestJob(lambda: True) + job.run() + + self.assertEquals(job.state, State.SUCCESS) + self.assertIsNotNone(job.end_date) + + def test_state_sysexit(self): + import sys + job = self.TestJob(lambda: sys.exit(0)) + job.run() + + self.assertEquals(job.state, State.SUCCESS) + self.assertIsNotNone(job.end_date) + + def test_state_failed(self): + def abort(): + raise RuntimeError("fail") + + job = self.TestJob(abort) + with self.assertRaises(RuntimeError): + job.run() + + self.assertEquals(job.state, State.FAILED) + self.assertIsNotNone(job.end_date) + + class BackfillJobTest(unittest.TestCase): def setUp(self):