Repository: incubator-airflow Updated Branches: refs/heads/master b0061f136 -> 284dbdb60
[AIRFLOW-2359] Add set failed for DagRun and task in tree view Closes #3255 from yrqls21/kevin_yang_add_set_failed Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/284dbdb6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/284dbdb6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/284dbdb6 Branch: refs/heads/master Commit: 284dbdb60ab1fec027dea4871e3013a4727f6041 Parents: b0061f1 Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Thu Jun 28 13:30:36 2018 -0700 Committer: Alex Guziel <alex.guz...@airbnb.com> Committed: Thu Jun 28 13:30:36 2018 -0700 ---------------------------------------------------------------------- airflow/api/common/experimental/mark_tasks.py | 113 +++++++- airflow/jobs.py | 3 +- airflow/models.py | 4 +- airflow/www/templates/airflow/dag.html | 43 +++ airflow/www/views.py | 295 +++++++++++++++------ airflow/www_rbac/templates/airflow/dag.html | 43 +++ airflow/www_rbac/views.py | 237 ++++++++++++----- docs/scheduler.rst | 1 + tests/api/common/experimental/mark_tasks.py | 275 ++++++++++++------- tests/dags/test_example_bash_operator.py | 5 +- tests/www_rbac/test_views.py | 8 +- 11 files changed, 765 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/api/common/experimental/mark_tasks.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index e9e4fec..681864d 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -17,15 +17,16 @@ # specific language governing permissions and limitations # under the License. +from sqlalchemy import or_ + from airflow.jobs import BackfillJob from airflow.models import DagRun, TaskInstance from airflow.operators.subdag_operator import SubDagOperator from airflow.settings import Session from airflow.utils import timezone +from airflow.utils.db import provide_session from airflow.utils.state import State -from sqlalchemy import or_ - def _create_dagruns(dag, execution_dates, state, run_id_template): """ @@ -191,15 +192,36 @@ def set_state(task, execution_date, upstream=False, downstream=False, return tis_altered -def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False): +def _set_dag_run_state(dag_id, execution_date, state, session=None): + """ + Helper method that set dag run state in the DB. + :param dag_id: dag_id of target dag run + :param execution_date: the execution date from which to start looking + :param state: target state + :param session: database session """ - Set the state of a dag run and all task instances associated with the dag - run for a specific execution date. + DR = DagRun + dr = session.query(DR).filter( + DR.dag_id == dag_id, + DR.execution_date == execution_date + ).one() + dr.state = state + dr.end_date = timezone.utcnow() + session.commit() + + +@provide_session +def set_dag_run_state_to_success(dag, execution_date, commit=False, + session=None): + """ + Set the dag run for a specific execution date and its task instances + to success. :param dag: the DAG of which to alter state :param execution_date: the execution date from which to start looking - :param state: the state to which the DAG need to be set :param commit: commit DAG and tasks to be altered to the database - :return: list of tasks that have been created and updated + :param session: database session + :return: If commit is true, list of tasks that have been updated, + otherwise list of tasks that will be updated :raises: AssertionError if dag or execution_date is invalid """ res = [] @@ -207,18 +229,81 @@ def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False): if not dag or not execution_date: return res - # Mark all task instances in the dag run + # Mark the dag run to success. + if commit: + _set_dag_run_state(dag.dag_id, execution_date, State.SUCCESS, session) + + # Mark all task instances of the dag run to success. for task in dag.tasks: task.dag = dag new_state = set_state(task=task, execution_date=execution_date, - state=state, commit=commit) + state=State.SUCCESS, commit=commit) res.extend(new_state) - # Mark the dag run + return res + + +@provide_session +def set_dag_run_state_to_failed(dag, execution_date, commit=False, + session=None): + """ + Set the dag run for a specific execution date and its running task instances + to failed. + :param dag: the DAG of which to alter state + :param execution_date: the execution date from which to start looking + :param commit: commit DAG and tasks to be altered to the database + :param session: database session + :return: If commit is true, list of tasks that have been updated, + otherwise list of tasks that will be updated + :raises: AssertionError if dag or execution_date is invalid + """ + res = [] + + if not dag or not execution_date: + return res + + # Mark the dag run to failed. + if commit: + _set_dag_run_state(dag.dag_id, execution_date, State.FAILED, session) + + # Mark only RUNNING task instances. + TI = TaskInstance + task_ids = [task.task_id for task in dag.tasks] + tis = session.query(TI).filter( + TI.dag_id == dag.dag_id, + TI.execution_date == execution_date, + TI.task_id.in_(task_ids)).filter(TI.state == State.RUNNING) + task_ids_of_running_tis = [ti.task_id for ti in tis] + for task in dag.tasks: + if task.task_id not in task_ids_of_running_tis: + continue + task.dag = dag + new_state = set_state(task=task, execution_date=execution_date, + state=State.FAILED, commit=commit) + res.extend(new_state) + + return res + + +@provide_session +def set_dag_run_state_to_running(dag, execution_date, commit=False, + session=None): + """ + Set the dag run for a specific execution date to running. + :param dag: the DAG of which to alter state + :param execution_date: the execution date from which to start looking + :param commit: commit DAG and tasks to be altered to the database + :param session: database session + :return: If commit is true, list of tasks that have been updated, + otherwise list of tasks that will be updated + """ + res = [] + if not dag or not execution_date: + return res + + # Mark the dag run to running. if commit: - drs = DagRun.find(dag.dag_id, execution_date=execution_date) - for dr in drs: - dr.dag = dag - dr.update_state() + _set_dag_run_state(dag.dag_id, execution_date, State.RUNNING, session) + # To keep the return type consistent with the other similar functions. return res http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 70891ab..00ede54 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1023,7 +1023,8 @@ class SchedulerJob(BaseJob): models.TaskInstance.dag_id == subq.c.dag_id, models.TaskInstance.task_id == subq.c.task_id, models.TaskInstance.execution_date == - subq.c.execution_date)) \ + subq.c.execution_date, + models.TaskInstance.task_id == subq.c.task_id)) \ .update({models.TaskInstance.state: new_state}, synchronize_session=False) session.commit() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 260c0ba..089befe 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1678,9 +1678,9 @@ class TaskInstance(Base, LoggingMixin): self.state = State.SKIPPED except AirflowException as e: self.refresh_from_db() - # for case when task is marked as success externally + # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback - if self.state == State.SUCCESS: + if self.state in {State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/www/templates/airflow/dag.html ---------------------------------------------------------------------- diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 9ef4cce..b4c09de 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -197,6 +197,24 @@ </button> </span> <hr/> + <button id="btn_failed" type="button" class="btn btn-primary"> + Mark Failed + </button> + <span class="btn-group"> + <button id="btn_failed_past" + type="button" class="btn" data-toggle="button">Past</button> + <button id="btn_failed_future" + type="button" class="btn" data-toggle="button"> + Future + </button> + <button id="btn_failed_upstream" + type="button" class="btn" data-toggle="button">Upstream</button> + <button id="btn_failed_downstream" + type="button" class="btn" data-toggle="button"> + Downstream + </button> + </span> + <hr/> <button id="btn_success" type="button" class="btn btn-primary"> Mark Success </button> @@ -241,6 +259,9 @@ <button id="btn_dagrun_clear" type="button" class="btn btn-primary"> Clear </button> + <button id="btn_dagrun_failed" type="button" class="btn btn-primary"> + Mark Failed + </button> <button id="btn_dagrun_success" type="button" class="btn btn-primary"> Mark Success </button> @@ -389,6 +410,20 @@ function updateQueryStringParameter(uri, key, value) { window.location = url; }); + $("#btn_failed").click(function(){ + url = "{{ url_for('airflow.failed') }}" + + "?task_id=" + encodeURIComponent(task_id) + + "&dag_id=" + encodeURIComponent(dag_id) + + "&upstream=" + $('#btn_failed_upstream').hasClass('active') + + "&downstream=" + $('#btn_failed_downstream').hasClass('active') + + "&future=" + $('#btn_failed_future').hasClass('active') + + "&past=" + $('#btn_failed_past').hasClass('active') + + "&execution_date=" + encodeURIComponent(execution_date) + + "&origin=" + encodeURIComponent(window.location); + + window.location = url; + }); + $("#btn_success").click(function(){ url = "{{ url_for('airflow.success') }}" + "?task_id=" + encodeURIComponent(task_id) + @@ -403,6 +438,14 @@ function updateQueryStringParameter(uri, key, value) { window.location = url; }); + $('#btn_dagrun_failed').click(function(){ + url = "{{ url_for('airflow.dagrun_failed') }}" + + "?dag_id=" + encodeURIComponent(dag_id) + + "&execution_date=" + encodeURIComponent(execution_date) + + "&origin=" + encodeURIComponent(window.location); + window.location = url; + }); + $('#btn_dagrun_success').click(function(){ url = "{{ url_for('airflow.dagrun_success') }}" + "?dag_id=" + encodeURIComponent(dag_id) + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 5c0c973..d37c0db 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -18,77 +18,69 @@ # under the License. # -from past.builtins import basestring, unicode - import ast +import codecs +import copy import datetime as dt +import inspect +import itertools +import json import logging -import os -import pkg_resources -import socket -from functools import wraps -from datetime import timedelta -import copy import math -import json -import bleach -import pendulum -import codecs +import os +import traceback from collections import defaultdict -import itertools - -import inspect +from datetime import timedelta +from functools import wraps from textwrap import dedent -import traceback +import bleach +import markdown +import nvd3 +import pendulum +import pkg_resources import sqlalchemy as sqla -from sqlalchemy import or_, desc, and_, union_all - from flask import ( abort, jsonify, redirect, url_for, request, Markup, Response, current_app, render_template, make_response) +from flask import flash +from flask._compat import PY2 from flask_admin import BaseView, expose, AdminIndexView -from flask_admin.contrib.sqla import ModelView from flask_admin.actions import action from flask_admin.babel import lazy_gettext +from flask_admin.contrib.sqla import ModelView +from flask_admin.form.fields import DateTimeField from flask_admin.tools import iterdecode -from flask import flash -from flask._compat import PY2 - -from jinja2.sandbox import ImmutableSandboxedEnvironment from jinja2 import escape - -import markdown -import nvd3 - +from jinja2.sandbox import ImmutableSandboxedEnvironment +from past.builtins import basestring, unicode +from pygments import highlight, lexers +from pygments.formatters import HtmlFormatter +from sqlalchemy import or_, desc, and_, union_all from wtforms import ( Form, SelectField, TextAreaField, PasswordField, StringField, validators) -from flask_admin.form.fields import DateTimeField - -from pygments import highlight, lexers -from pygments.formatters import HtmlFormatter import airflow from airflow import configuration as conf from airflow import models from airflow import settings -from airflow.api.common.experimental.mark_tasks import set_dag_run_state +from airflow.api.common.experimental.mark_tasks import (set_dag_run_state_to_running, + set_dag_run_state_to_success, + set_dag_run_state_to_failed) from airflow.exceptions import AirflowException -from airflow.models import XCom, DagRun -from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS - from airflow.models import BaseOperator +from airflow.models import XCom, DagRun from airflow.operators.subdag_operator import SubDagOperator - +from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS from airflow.utils import timezone -from airflow.utils.json import json_ser -from airflow.utils.state import State +from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date from airflow.utils.db import create_session, provide_session from airflow.utils.helpers import alchemy_to_dict -from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date -from airflow.utils.timezone import datetime +from airflow.utils.json import json_ser from airflow.utils.net import get_hostname +from airflow.utils.state import State +from airflow.utils.timezone import datetime from airflow.www import utils as wwwutils from airflow.www.forms import (DateTimeForm, DateTimeWithNumRunsForm, DateTimeWithNumRunsWithDagRunsForm) @@ -1208,16 +1200,35 @@ class Airflow(BaseView): }) return wwwutils.json_response(payload) - @expose('/dagrun_success') - @login_required - @wwwutils.action_logging - @wwwutils.notify_owner - def dagrun_success(self): - dag_id = request.args.get('dag_id') - execution_date = request.args.get('execution_date') - confirmed = request.args.get('confirmed') == 'true' - origin = request.args.get('origin') + def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, origin): + if not execution_date: + flash('Invalid execution date', 'error') + return redirect(origin) + + execution_date = pendulum.parse(execution_date) + dag = dagbag.get_dag(dag_id) + + if not dag: + flash('Cannot find DAG: {}'.format(dag_id), 'error') + return redirect(origin) + + new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed) + + if confirmed: + flash('Marked failed on {} task instances'.format(len(new_dag_state))) + return redirect(origin) + + else: + details = '\n'.join([str(t) for t in new_dag_state]) + + response = self.render('airflow/confirm.html', + message=("Here's the list of task instances you are " + "about to mark as failed"), + details=details) + + return response + def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origin): if not execution_date: flash('Invalid execution date', 'error') return redirect(origin) @@ -1229,8 +1240,8 @@ class Airflow(BaseView): flash('Cannot find DAG: {}'.format(dag_id), 'error') return redirect(origin) - new_dag_state = set_dag_run_state(dag, execution_date, state=State.SUCCESS, - commit=confirmed) + new_dag_state = set_dag_run_state_to_success(dag, execution_date, + commit=confirmed) if confirmed: flash('Marked success on {} task instances'.format(len(new_dag_state))) @@ -1241,30 +1252,43 @@ class Airflow(BaseView): response = self.render('airflow/confirm.html', message=("Here's the list of task instances you are " - "about to mark as successful:"), + "about to mark as success"), details=details) return response - @expose('/success') + @expose('/dagrun_failed') @login_required @wwwutils.action_logging @wwwutils.notify_owner - def success(self): + def dagrun_failed(self): dag_id = request.args.get('dag_id') - task_id = request.args.get('task_id') + execution_date = request.args.get('execution_date') + confirmed = request.args.get('confirmed') == 'true' + origin = request.args.get('origin') + return self._mark_dagrun_state_as_failed(dag_id, execution_date, + confirmed, origin) + + @expose('/dagrun_success') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def dagrun_success(self): + dag_id = request.args.get('dag_id') + execution_date = request.args.get('execution_date') + confirmed = request.args.get('confirmed') == 'true' origin = request.args.get('origin') + return self._mark_dagrun_state_as_success(dag_id, execution_date, + confirmed, origin) + + def _mark_task_instance_state(self, dag_id, task_id, origin, execution_date, + confirmed, upstream, downstream, + future, past, state): dag = dagbag.get_dag(dag_id) task = dag.get_task(task_id) task.dag = dag - execution_date = request.args.get('execution_date') execution_date = pendulum.parse(execution_date) - confirmed = request.args.get('confirmed') == "true" - upstream = request.args.get('upstream') == "true" - downstream = request.args.get('downstream') == "true" - future = request.args.get('future') == "true" - past = request.args.get('past') == "true" if not dag: flash("Cannot find DAG: {}".format(dag_id)) @@ -1279,26 +1303,66 @@ class Airflow(BaseView): if confirmed: altered = set_state(task=task, execution_date=execution_date, upstream=upstream, downstream=downstream, - future=future, past=past, state=State.SUCCESS, + future=future, past=past, state=state, commit=True) - flash("Marked success on {} task instances".format(len(altered))) + flash("Marked {} on {} task instances".format(state, len(altered))) return redirect(origin) to_be_altered = set_state(task=task, execution_date=execution_date, upstream=upstream, downstream=downstream, - future=future, past=past, state=State.SUCCESS, + future=future, past=past, state=state, commit=False) details = "\n".join([str(t) for t in to_be_altered]) response = self.render("airflow/confirm.html", message=("Here's the list of task instances you are " - "about to mark as successful:"), + "about to mark as {}:".format(state)), details=details) return response + @expose('/failed') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def failed(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + origin = request.args.get('origin') + execution_date = request.args.get('execution_date') + + confirmed = request.args.get('confirmed') == "true" + upstream = request.args.get('upstream') == "true" + downstream = request.args.get('downstream') == "true" + future = request.args.get('future') == "true" + past = request.args.get('past') == "true" + + return self._mark_task_instance_state(dag_id, task_id, origin, execution_date, + confirmed, upstream, downstream, + future, past, State.FAILED) + + @expose('/success') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def success(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + origin = request.args.get('origin') + execution_date = request.args.get('execution_date') + + confirmed = request.args.get('confirmed') == "true" + upstream = request.args.get('upstream') == "true" + downstream = request.args.get('downstream') == "true" + future = request.args.get('future') == "true" + past = request.args.get('past') == "true" + + return self._mark_task_instance_state(dag_id, task_id, origin, execution_date, + confirmed, upstream, downstream, + future, past, State.SUCCESS) + @expose('/tree') @login_required @wwwutils.gzipped @@ -2596,40 +2660,107 @@ class DagRunModelView(ModelViewOnly): models.DagStat.update(dirty_ids, dirty_only=False, session=session) @action('set_running', "Set state to 'running'", None) - def action_set_running(self, ids): - self.set_dagrun_state(ids, State.RUNNING) - - @action('set_failed', "Set state to 'failed'", None) - def action_set_failed(self, ids): - self.set_dagrun_state(ids, State.FAILED) + @provide_session + def action_set_running(self, ids, session=None): + try: + DR = models.DagRun + count = 0 + dirty_ids = [] + for dr in session.query(DR).filter(DR.id.in_(ids)).all(): + dirty_ids.append(dr.dag_id) + count += 1 + dr.state = State.RUNNING + dr.start_date = timezone.utcnow() + models.DagStat.update(dirty_ids, session=session) + flash( + "{count} dag runs were set to running".format(**locals())) + except Exception as ex: + if not self.handle_view_exception(ex): + raise Exception("Ooops") + flash('Failed to set state', 'error') - @action('set_success', "Set state to 'success'", None) - def action_set_success(self, ids): - self.set_dagrun_state(ids, State.SUCCESS) + @action('set_failed', "Set state to 'failed'", + "All running task instances would also be marked as failed, are you sure?") + @provide_session + def action_set_failed(self, ids, session=None): + try: + DR = models.DagRun + count = 0 + dirty_ids = [] + altered_tis = [] + for dr in session.query(DR).filter(DR.id.in_(ids)).all(): + dirty_ids.append(dr.dag_id) + count += 1 + altered_tis += \ + set_dag_run_state_to_failed(dagbag.get_dag(dr.dag_id), + dr.execution_date, + commit=True, + session=session) + models.DagStat.update(dirty_ids, session=session) + altered_ti_count = len(altered_tis) + flash( + "{count} dag runs and {altered_ti_count} task instances " + "were set to failed".format(**locals())) + except Exception as ex: + if not self.handle_view_exception(ex): + raise Exception("Ooops") + flash('Failed to set state', 'error') + @action('set_success', "Set state to 'success'", + "All task instances would also be marked as success, are you sure?") @provide_session - def set_dagrun_state(self, ids, target_state, session=None): + def action_set_success(self, ids, session=None): try: DR = models.DagRun count = 0 dirty_ids = [] + altered_tis = [] for dr in session.query(DR).filter(DR.id.in_(ids)).all(): dirty_ids.append(dr.dag_id) count += 1 - dr.state = target_state - if target_state == State.RUNNING: - dr.start_date = timezone.utcnow() - else: - dr.end_date = timezone.utcnow() - session.commit() + altered_tis += \ + set_dag_run_state_to_success(dagbag.get_dag(dr.dag_id), + dr.execution_date, + commit=True, + session=session) models.DagStat.update(dirty_ids, session=session) + altered_ti_count = len(altered_tis) flash( - "{count} dag runs were set to '{target_state}'".format(**locals())) + "{count} dag runs and {altered_ti_count} task instances " + "were set to success".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") flash('Failed to set state', 'error') + # Called after editing DagRun model in the UI. + @provide_session + def after_model_change(self, form, dagrun, is_created, session=None): + altered_tis = [] + if dagrun.state == State.SUCCESS: + altered_tis = set_dag_run_state_to_success( + dagbag.get_dag(dagrun.dag_id), + dagrun.execution_date, + commit=True) + elif dagrun.state == State.FAILED: + altered_tis = set_dag_run_state_to_failed( + dagbag.get_dag(dagrun.dag_id), + dagrun.execution_date, + commit=True, + session=session) + elif dagrun.state == State.RUNNING: + altered_tis = set_dag_run_state_to_running( + dagbag.get_dag(dagrun.dag_id), + dagrun.execution_date, + commit=True, + session=session) + + altered_ti_count = len(altered_tis) + models.DagStat.update([dagrun.dag_id], session=session) + flash( + "1 dag run and {altered_ti_count} task instances " + "were set to '{dagrun.state}'".format(**locals())) + class LogModelView(ModelViewOnly): verbose_name_plural = "logs" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/www_rbac/templates/airflow/dag.html ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/templates/airflow/dag.html b/airflow/www_rbac/templates/airflow/dag.html index e6495fe..eb297ad 100644 --- a/airflow/www_rbac/templates/airflow/dag.html +++ b/airflow/www_rbac/templates/airflow/dag.html @@ -196,6 +196,24 @@ </button> </span> <hr/> + <button id="btn_failed" type="button" class="btn btn-primary"> + Mark Failed + </button> + <span class="btn-group"> + <button id="btn_failed_past" + type="button" class="btn" data-toggle="button">Past</button> + <button id="btn_failed_future" + type="button" class="btn" data-toggle="button"> + Future + </button> + <button id="btn_failed_upstream" + type="button" class="btn" data-toggle="button">Upstream</button> + <button id="btn_failed_downstream" + type="button" class="btn" data-toggle="button"> + Downstream + </button> + </span> + <hr/> <button id="btn_success" type="button" class="btn btn-primary"> Mark Success </button> @@ -240,6 +258,9 @@ <button id="btn_dagrun_clear" type="button" class="btn btn-primary"> Clear </button> + <button id="btn_dagrun_failed" type="button" class="btn btn-primary"> + Mark Failed + </button> <button id="btn_dagrun_success" type="button" class="btn btn-primary"> Mark Success </button> @@ -387,6 +408,20 @@ function updateQueryStringParameter(uri, key, value) { window.location = url; }); + $("#btn_failed").click(function(){ + url = "{{ url_for('Airflow.failed') }}" + + "?task_id=" + encodeURIComponent(task_id) + + "&dag_id=" + encodeURIComponent(dag_id) + + "&upstream=" + $('#btn_failed_upstream').hasClass('active') + + "&downstream=" + $('#btn_failed_downstream').hasClass('active') + + "&future=" + $('#btn_failed_future').hasClass('active') + + "&past=" + $('#btn_failed_past').hasClass('active') + + "&execution_date=" + encodeURIComponent(execution_date) + + "&origin=" + encodeURIComponent(window.location); + + window.location = url; + }); + $("#btn_success").click(function(){ url = "{{ url_for('Airflow.success') }}" + "?task_id=" + encodeURIComponent(task_id) + @@ -401,6 +436,14 @@ function updateQueryStringParameter(uri, key, value) { window.location = url; }); + $('#btn_dagrun_failed').click(function(){ + url = "{{ url_for('Airflow.dagrun_failed') }}" + + "?dag_id=" + encodeURIComponent(dag_id) + + "&execution_date=" + encodeURIComponent(execution_date) + + "&origin=" + encodeURIComponent(window.location); + window.location = url; + }); + $('#btn_dagrun_success').click(function(){ url = "{{ url_for('Airflow.dagrun_success') }}" + "?dag_id=" + encodeURIComponent(dag_id) + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/www_rbac/views.py ---------------------------------------------------------------------- diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index 78f9799..9cbc642 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -18,57 +18,49 @@ # under the License. # -from past.builtins import unicode - +import copy +import itertools +import json import logging +import math import os import socket -from datetime import datetime, timedelta -import copy -import math -import json -from collections import defaultdict import traceback +from collections import defaultdict +from datetime import timedelta + import markdown import nvd3 import pendulum -import itertools - import sqlalchemy as sqla -from sqlalchemy import or_, desc, and_, union_all - from flask import ( g, redirect, request, Markup, Response, render_template, make_response, flash, jsonify) from flask._compat import PY2 - from flask_appbuilder import BaseView, ModelView, expose, has_access -from flask_appbuilder.models.sqla.interface import SQLAInterface from flask_appbuilder.actions import action - +from flask_appbuilder.models.sqla.interface import SQLAInterface from flask_babel import lazy_gettext - -from wtforms import Form, SelectField, validators - +from past.builtins import unicode from pygments import highlight, lexers from pygments.formatters import HtmlFormatter +from sqlalchemy import or_, desc, and_, union_all +from wtforms import SelectField, validators import airflow from airflow import configuration as conf from airflow import models, jobs from airflow import settings -from airflow.api.common.experimental.mark_tasks import set_dag_run_state +from airflow.api.common.experimental.mark_tasks import (set_dag_run_state_to_success, + set_dag_run_state_to_failed) from airflow.models import XCom, DagRun from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS - from airflow.utils import timezone -from airflow.utils.json import json_ser -from airflow.utils.state import State +from airflow.utils.dates import infer_time_unit, scale_time_units from airflow.utils.db import provide_session from airflow.utils.helpers import alchemy_to_dict -from airflow.utils.dates import infer_time_unit, scale_time_units - - +from airflow.utils.json import json_ser +from airflow.utils.state import State from airflow.www_rbac import utils as wwwutils from airflow.www_rbac.app import app from airflow.www_rbac.decorators import action_logging, gzipped @@ -888,15 +880,35 @@ class Airflow(AirflowBaseView): }) return wwwutils.json_response(payload) - @expose('/dagrun_success') - @has_access - @action_logging - def dagrun_success(self): - dag_id = request.args.get('dag_id') - execution_date = request.args.get('execution_date') - confirmed = request.args.get('confirmed') == 'true' - origin = request.args.get('origin') + def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, origin): + if not execution_date: + flash('Invalid execution date', 'error') + return redirect(origin) + + execution_date = pendulum.parse(execution_date) + dag = dagbag.get_dag(dag_id) + + if not dag: + flash('Cannot find DAG: {}'.format(dag_id), 'error') + return redirect(origin) + + new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed) + + if confirmed: + flash('Marked failed on {} task instances'.format(len(new_dag_state))) + return redirect(origin) + + else: + details = '\n'.join([str(t) for t in new_dag_state]) + + response = self.render('airflow/confirm.html', + message=("Here's the list of task instances you are " + "about to mark as failed"), + details=details) + return response + + def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origin): if not execution_date: flash('Invalid execution date', 'error') return redirect(origin) @@ -908,8 +920,8 @@ class Airflow(AirflowBaseView): flash('Cannot find DAG: {}'.format(dag_id), 'error') return redirect(origin) - new_dag_state = set_dag_run_state(dag, execution_date, state=State.SUCCESS, - commit=confirmed) + new_dag_state = set_dag_run_state_to_success(dag, execution_date, + commit=confirmed) if confirmed: flash('Marked success on {} task instances'.format(len(new_dag_state))) @@ -920,29 +932,41 @@ class Airflow(AirflowBaseView): response = self.render('airflow/confirm.html', message=("Here's the list of task instances you are " - "about to mark as successful:"), + "about to mark as success"), details=details) return response - @expose('/success') + @expose('/dagrun_failed') @has_access @action_logging - def success(self): + def dagrun_failed(self): dag_id = request.args.get('dag_id') - task_id = request.args.get('task_id') + execution_date = request.args.get('execution_date') + confirmed = request.args.get('confirmed') == 'true' + origin = request.args.get('origin') + return self._mark_dagrun_state_as_failed(dag_id, execution_date, + confirmed, origin) + + @expose('/dagrun_success') + @has_access + @action_logging + def dagrun_success(self): + dag_id = request.args.get('dag_id') + execution_date = request.args.get('execution_date') + confirmed = request.args.get('confirmed') == 'true' origin = request.args.get('origin') + return self._mark_dagrun_state_as_success(dag_id, execution_date, + confirmed, origin) + + def _mark_task_instance_state(self, dag_id, task_id, origin, execution_date, + confirmed, upstream, downstream, + future, past, state): dag = dagbag.get_dag(dag_id) task = dag.get_task(task_id) task.dag = dag - execution_date = request.args.get('execution_date') execution_date = pendulum.parse(execution_date) - confirmed = request.args.get('confirmed') == "true" - upstream = request.args.get('upstream') == "true" - downstream = request.args.get('downstream') == "true" - future = request.args.get('future') == "true" - past = request.args.get('past') == "true" if not dag: flash("Cannot find DAG: {}".format(dag_id)) @@ -957,26 +981,64 @@ class Airflow(AirflowBaseView): if confirmed: altered = set_state(task=task, execution_date=execution_date, upstream=upstream, downstream=downstream, - future=future, past=past, state=State.SUCCESS, + future=future, past=past, state=state, commit=True) - flash("Marked success on {} task instances".format(len(altered))) + flash("Marked {} on {} task instances".format(state, len(altered))) return redirect(origin) to_be_altered = set_state(task=task, execution_date=execution_date, upstream=upstream, downstream=downstream, - future=future, past=past, state=State.SUCCESS, + future=future, past=past, state=state, commit=False) details = "\n".join([str(t) for t in to_be_altered]) response = self.render("airflow/confirm.html", message=("Here's the list of task instances you are " - "about to mark as successful:"), + "about to mark as {}:".format(state)), details=details) return response + @expose('/failed') + @has_access + @action_logging + def failed(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + origin = request.args.get('origin') + execution_date = request.args.get('execution_date') + + confirmed = request.args.get('confirmed') == "true" + upstream = request.args.get('upstream') == "true" + downstream = request.args.get('downstream') == "true" + future = request.args.get('future') == "true" + past = request.args.get('past') == "true" + + return self._mark_task_instance_state(dag_id, task_id, origin, execution_date, + confirmed, upstream, downstream, + future, past, State.FAILED) + + @expose('/success') + @has_access + @action_logging + def success(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + origin = request.args.get('origin') + execution_date = request.args.get('execution_date') + + confirmed = request.args.get('confirmed') == "true" + upstream = request.args.get('upstream') == "true" + downstream = request.args.get('downstream') == "true" + future = request.args.get('future') == "true" + past = request.args.get('past') == "true" + + return self._mark_task_instance_state(dag_id, task_id, origin, execution_date, + confirmed, upstream, downstream, + future, past, State.SUCCESS) + @expose('/tree') @has_access @gzipped @@ -1972,43 +2034,86 @@ class DagRunModelView(AirflowModelView): def action_muldelete(self, items, session=None): self.datamodel.delete_all(items) self.update_redirect() - return redirect(self.get_redirect()) dirty_ids = [] for item in items: dirty_ids.append(item.dag_id) models.DagStat.update(dirty_ids, dirty_only=False, session=session) + return redirect(self.get_redirect()) @action('set_running', "Set state to 'running'", '', single=False) - def action_set_running(self, drs): - return self.set_dagrun_state(drs, State.RUNNING) - - @action('set_failed', "Set state to 'failed'", '', single=False) - def action_set_failed(self, drs): - return self.set_dagrun_state(drs, State.FAILED) + @provide_session + def action_set_running(self, drs, session=None): + try: + DR = models.DagRun + count = 0 + dirty_ids = [] + for dr in session.query(DR).filter( + DR.id.in_([dagrun.id for dagrun in drs])).all(): + dirty_ids.append(dr.dag_id) + count += 1 + dr.start_date = timezone.utcnow() + dr.state = State.RUNNING + models.DagStat.update(dirty_ids, session=session) + session.commit() + flash( + "{count} dag runs were set to running".format(**locals())) + except Exception as ex: + flash(str(ex)) + flash('Failed to set state', 'error') + return redirect(self.route_base + '/list') - @action('set_success', "Set state to 'success'", '', single=False) - def action_set_success(self, drs): - return self.set_dagrun_state(drs, State.SUCCESS) + @action('set_failed', "Set state to 'failed'", + "All running task instances would also be marked as failed, are you sure?", + single=False) + @provide_session + def action_set_failed(self, drs, session=None): + try: + DR = models.DagRun + count = 0 + dirty_ids = [] + altered_tis = [] + for dr in session.query(DR).filter( + DR.id.in_([dagrun.id for dagrun in drs])).all(): + dirty_ids.append(dr.dag_id) + count += 1 + altered_tis += \ + set_dag_run_state_to_failed(dagbag.get_dag(dr.dag_id), + dr.execution_date, + commit=True, + session=session) + models.DagStat.update(dirty_ids, session=session) + altered_ti_count = len(altered_tis) + flash( + "{count} dag runs and {altered_ti_count} task instances " + "were set to failed".format(**locals())) + except Exception as ex: + flash('Failed to set state', 'error') + return redirect(self.route_base + '/list') + @action('set_success', "Set state to 'success'", + "All task instances would also be marked as success, are you sure?", + single=False) @provide_session - def set_dagrun_state(self, drs, target_state, session=None): + def action_set_success(self, drs, session=None): try: DR = models.DagRun count = 0 dirty_ids = [] + altered_tis = [] for dr in session.query(DR).filter( DR.id.in_([dagrun.id for dagrun in drs])).all(): dirty_ids.append(dr.dag_id) count += 1 - dr.state = target_state - if target_state == State.RUNNING: - dr.start_date = timezone.utcnow() - else: - dr.end_date = timezone.utcnow() - session.commit() + altered_tis += \ + set_dag_run_state_to_success(dagbag.get_dag(dr.dag_id), + dr.execution_date, + commit=True, + session=session) models.DagStat.update(dirty_ids, session=session) + altered_ti_count = len(altered_tis) flash( - "{count} dag runs were set to '{target_state}'".format(**locals())) + "{count} dag runs and {altered_ti_count} task instances " + "were set to success".format(**locals())) except Exception as ex: flash('Failed to set state', 'error') return redirect(self.route_base + '/list') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/docs/scheduler.rst ---------------------------------------------------------------------- diff --git a/docs/scheduler.rst b/docs/scheduler.rst index dfa0a42..da5fc70 100644 --- a/docs/scheduler.rst +++ b/docs/scheduler.rst @@ -158,6 +158,7 @@ Here are some of the ways you can **unblock tasks**: states (``failed``, or ``success``) * Clearing a task instance will no longer delete the task instance record. Instead it updates max_tries and set the current task instance state to be None. +* Marking task instances as failed can be done through the UI. This can be used to stop running task instances. * Marking task instances as successful can be done through the UI. This is mostly to fix false negatives, or for instance when the fix has been applied outside of Airflow. * The ``airflow backfill`` CLI subcommand has a flag to ``--mark_success`` and allows selecting http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/tests/api/common/experimental/mark_tasks.py ---------------------------------------------------------------------- diff --git a/tests/api/common/experimental/mark_tasks.py b/tests/api/common/experimental/mark_tasks.py index 3ad5053..181d10d 100644 --- a/tests/api/common/experimental/mark_tasks.py +++ b/tests/api/common/experimental/mark_tasks.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,14 +18,16 @@ # under the License. import unittest +from datetime import datetime from airflow import models from airflow.api.common.experimental.mark_tasks import ( - set_state, _create_dagruns, set_dag_run_state) + set_state, _create_dagruns, set_dag_run_state_to_success, set_dag_run_state_to_failed, + set_dag_run_state_to_running) from airflow.settings import Session +from airflow.utils import timezone from airflow.utils.dates import days_ago from airflow.utils.state import State -from datetime import datetime, timedelta DEV_NULL = "/dev/null" @@ -223,115 +225,183 @@ class TestMarkDAGRun(unittest.TestCase): self.session = Session() - def verify_dag_run_states(self, dag, date, state=State.SUCCESS): - drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date) - dr = drs[0] - self.assertEqual(dr.get_state(), state) - tis = dr.get_task_instances(session=self.session) + def _set_default_task_instance_states(self, dr): + if dr.dag_id != 'test_example_bash_operator': + return + # success task + dr.get_task_instance('runme_0').set_state(State.SUCCESS, self.session) + # skipped task + dr.get_task_instance('runme_1').set_state(State.SKIPPED, self.session) + # retry task + dr.get_task_instance('runme_2').set_state(State.UP_FOR_RETRY, self.session) + # queued task + dr.get_task_instance('also_run_this').set_state(State.QUEUED, self.session) + # running task + dr.get_task_instance('run_after_loop').set_state(State.RUNNING, self.session) + # failed task + dr.get_task_instance('run_this_last').set_state(State.FAILED, self.session) + + def _verify_task_instance_states_remain_default(self, dr): + self.assertEqual(dr.get_task_instance('runme_0').state, State.SUCCESS) + self.assertEqual(dr.get_task_instance('runme_1').state, State.SKIPPED) + self.assertEqual(dr.get_task_instance('runme_2').state, State.UP_FOR_RETRY) + self.assertEqual(dr.get_task_instance('also_run_this').state, State.QUEUED, ) + self.assertEqual(dr.get_task_instance('run_after_loop').state, State.RUNNING) + self.assertEqual(dr.get_task_instance('run_this_last').state, State.FAILED) + + def _verify_task_instance_states(self, dag, date, state): + TI = models.TaskInstance + tis = self.session.query(TI).filter(TI.dag_id == dag.dag_id, + TI.execution_date == date) for ti in tis: self.assertEqual(ti.state, state) - def test_set_running_dag_run_state(self): - date = self.execution_dates[0] - dr = self.dag1.create_dagrun( + def _create_test_dag_run(self, state, date): + return self.dag1.create_dagrun( run_id='manual__' + datetime.now().isoformat(), - state=State.RUNNING, + state=state, execution_date=date, session=self.session ) - for ti in dr.get_task_instances(session=self.session): - ti.set_state(State.RUNNING, self.session) - altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True) + def _verify_dag_run_state(self, dag, date, state): + drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date) + dr = drs[0] + self.assertEqual(dr.get_state(), state) + + def test_set_running_dag_run_to_success(self): + date = self.execution_dates[0] + dr = self._create_test_dag_run(State.RUNNING, date) + self._set_default_task_instance_states(dr) + + altered = set_dag_run_state_to_success(self.dag1, date, commit=True) - # All of the task should be altered - self.assertEqual(len(altered), len(self.dag1.tasks)) - self.verify_dag_run_states(self.dag1, date) + # All except the SUCCESS task should be altered. + self.assertEqual(len(altered), 5) + self._verify_dag_run_state(self.dag1, date, State.SUCCESS) + self._verify_task_instance_states(self.dag1, date, State.SUCCESS) - def test_set_success_dag_run_state(self): + def test_set_running_dag_run_to_failed(self): date = self.execution_dates[0] + dr = self._create_test_dag_run(State.RUNNING, date) + self._set_default_task_instance_states(dr) - dr = self.dag1.create_dagrun( - run_id='manual__' + datetime.now().isoformat(), - state=State.SUCCESS, - execution_date=date, - session=self.session - ) - for ti in dr.get_task_instances(session=self.session): - ti.set_state(State.SUCCESS, self.session) + altered = set_dag_run_state_to_failed(self.dag1, date, commit=True) - altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True) + # Only running task should be altered. + self.assertEqual(len(altered), 1) + self._verify_dag_run_state(self.dag1, date, State.FAILED) + self.assertEqual(dr.get_task_instance('run_after_loop').state, State.FAILED) + + def test_set_running_dag_run_to_running(self): + date = self.execution_dates[0] + dr = self._create_test_dag_run(State.RUNNING, date) + self._set_default_task_instance_states(dr) - # None of the task should be altered + altered = set_dag_run_state_to_running(self.dag1, date, commit=True) + + # None of the tasks should be altered. self.assertEqual(len(altered), 0) - self.verify_dag_run_states(self.dag1, date) + self._verify_dag_run_state(self.dag1, date, State.RUNNING) + self._verify_task_instance_states_remain_default(dr) - def test_set_failed_dag_run_state(self): + def test_set_success_dag_run_to_success(self): date = self.execution_dates[0] - dr = self.dag1.create_dagrun( - run_id='manual__' + datetime.now().isoformat(), - state=State.FAILED, - execution_date=date, - session=self.session - ) - dr.get_task_instance('runme_0').set_state(State.FAILED, self.session) + dr = self._create_test_dag_run(State.SUCCESS, date) + self._set_default_task_instance_states(dr) - altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True) + altered = set_dag_run_state_to_success(self.dag1, date, commit=True) - # All of the task should be altered - self.assertEqual(len(altered), len(self.dag1.tasks)) - self.verify_dag_run_states(self.dag1, date) + # All except the SUCCESS task should be altered. + self.assertEqual(len(altered), 5) + self._verify_dag_run_state(self.dag1, date, State.SUCCESS) + self._verify_task_instance_states(self.dag1, date, State.SUCCESS) - def test_set_mixed_dag_run_state(self): - """ - This test checks function set_dag_run_state with mixed task instance - state. - """ + def test_set_success_dag_run_to_failed(self): date = self.execution_dates[0] - dr = self.dag1.create_dagrun( - run_id='manual__' + datetime.now().isoformat(), - state=State.FAILED, - execution_date=date, - session=self.session - ) - # success task - dr.get_task_instance('runme_0').set_state(State.SUCCESS, self.session) - # skipped task - dr.get_task_instance('runme_1').set_state(State.SKIPPED, self.session) - # retry task - dr.get_task_instance('runme_2').set_state(State.UP_FOR_RETRY, self.session) - # queued task - dr.get_task_instance('also_run_this').set_state(State.QUEUED, self.session) - # running task - dr.get_task_instance('run_after_loop').set_state(State.RUNNING, self.session) - # failed task - dr.get_task_instance('run_this_last').set_state(State.FAILED, self.session) + dr = self._create_test_dag_run(State.SUCCESS, date) + self._set_default_task_instance_states(dr) - altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True) + altered = set_dag_run_state_to_failed(self.dag1, date, commit=True) - self.assertEqual(len(altered), len(self.dag1.tasks) - 1) # only 1 task succeeded - self.verify_dag_run_states(self.dag1, date) + # Only running task should be altered. + self.assertEqual(len(altered), 1) + self._verify_dag_run_state(self.dag1, date, State.FAILED) + self.assertEqual(dr.get_task_instance('run_after_loop').state, State.FAILED) + + def test_set_success_dag_run_to_running(self): + date = self.execution_dates[0] + dr = self._create_test_dag_run(State.SUCCESS, date) + self._set_default_task_instance_states(dr) + + altered = set_dag_run_state_to_running(self.dag1, date, commit=True) + + # None of the tasks should be altered. + self.assertEqual(len(altered), 0) + self._verify_dag_run_state(self.dag1, date, State.RUNNING) + self._verify_task_instance_states_remain_default(dr) + + def test_set_failed_dag_run_to_success(self): + date = self.execution_dates[0] + dr = self._create_test_dag_run(State.SUCCESS, date) + self._set_default_task_instance_states(dr) + + altered = set_dag_run_state_to_success(self.dag1, date, commit=True) + + # All except the SUCCESS task should be altered. + self.assertEqual(len(altered), 5) + self._verify_dag_run_state(self.dag1, date, State.SUCCESS) + self._verify_task_instance_states(self.dag1, date, State.SUCCESS) + + def test_set_failed_dag_run_to_failed(self): + date = self.execution_dates[0] + dr = self._create_test_dag_run(State.SUCCESS, date) + self._set_default_task_instance_states(dr) + + altered = set_dag_run_state_to_failed(self.dag1, date, commit=True) + + # Only running task should be altered. + self.assertEqual(len(altered), 1) + self._verify_dag_run_state(self.dag1, date, State.FAILED) + self.assertEqual(dr.get_task_instance('run_after_loop').state, State.FAILED) + + def test_set_failed_dag_run_to_running(self): + date = self.execution_dates[0] + dr = self._create_test_dag_run(State.SUCCESS, date) + self._set_default_task_instance_states(dr) + + altered = set_dag_run_state_to_running(self.dag1, date, commit=True) + + # None of the tasks should be altered. + self.assertEqual(len(altered), 0) + self._verify_dag_run_state(self.dag1, date, State.RUNNING) + self._verify_task_instance_states_remain_default(dr) def test_set_state_without_commit(self): date = self.execution_dates[0] + dr = self._create_test_dag_run(State.RUNNING, date) + self._set_default_task_instance_states(dr) - # Running dag run and task instances - dr = self.dag1.create_dagrun( - run_id='manual__' + datetime.now().isoformat(), - state=State.RUNNING, - execution_date=date, - session=self.session - ) - for ti in dr.get_task_instances(session=self.session): - ti.set_state(State.RUNNING, self.session) + will_be_altered = set_dag_run_state_to_running(self.dag1, date, commit=False) - altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=False) + # None of the tasks will be altered. + self.assertEqual(len(will_be_altered), 0) + self._verify_dag_run_state(self.dag1, date, State.RUNNING) + self._verify_task_instance_states_remain_default(dr) - # All of the task should be altered - self.assertEqual(len(altered), len(self.dag1.tasks)) + will_be_altered = set_dag_run_state_to_failed(self.dag1, date, commit=False) - # Both dag run and task instances' states should remain the same - self.verify_dag_run_states(self.dag1, date, State.RUNNING) + # Only the running task will be altered. + self.assertEqual(len(will_be_altered), 1) + self._verify_dag_run_state(self.dag1, date, State.RUNNING) + self._verify_task_instance_states_remain_default(dr) + + will_be_altered = set_dag_run_state_to_success(self.dag1, date, commit=False) + + # All except the SUCCESS task should be altered. + self.assertEqual(len(will_be_altered), 5) + self._verify_dag_run_state(self.dag1, date, State.RUNNING) + self._verify_task_instance_states_remain_default(dr) def test_set_state_with_multiple_dagruns(self): dr1 = self.dag2.create_dagrun( @@ -353,8 +423,8 @@ class TestMarkDAGRun(unittest.TestCase): session=self.session ) - altered = set_dag_run_state(self.dag2, self.execution_dates[1], - state=State.SUCCESS, commit=True) + altered = set_dag_run_state_to_success(self.dag2, self.execution_dates[1], + commit=True) # Recursively count number of tasks in the dag def count_dag_tasks(dag): @@ -364,29 +434,45 @@ class TestMarkDAGRun(unittest.TestCase): return count self.assertEqual(len(altered), count_dag_tasks(self.dag2)) - self.verify_dag_run_states(self.dag2, self.execution_dates[1]) + self._verify_dag_run_state(self.dag2, self.execution_dates[1], State.SUCCESS) # Make sure other dag status are not changed - dr1 = models.DagRun.find(dag_id=self.dag2.dag_id, execution_date=self.execution_dates[0]) + dr1 = models.DagRun.find(dag_id=self.dag2.dag_id, + execution_date=self.execution_dates[0]) dr1 = dr1[0] - self.assertEqual(dr1.get_state(), State.FAILED) - dr3 = models.DagRun.find(dag_id=self.dag2.dag_id, execution_date=self.execution_dates[2]) + self._verify_dag_run_state(self.dag2, self.execution_dates[0], State.FAILED) + dr3 = models.DagRun.find(dag_id=self.dag2.dag_id, + execution_date=self.execution_dates[2]) dr3 = dr3[0] - self.assertEqual(dr3.get_state(), State.RUNNING) + self._verify_dag_run_state(self.dag2, self.execution_dates[2], State.RUNNING) def test_set_dag_run_state_edge_cases(self): # Dag does not exist - altered = set_dag_run_state(None, self.execution_dates[0]) + altered = set_dag_run_state_to_success(None, self.execution_dates[0]) + self.assertEqual(len(altered), 0) + altered = set_dag_run_state_to_failed(None, self.execution_dates[0]) + self.assertEqual(len(altered), 0) + altered = set_dag_run_state_to_running(None, self.execution_dates[0]) self.assertEqual(len(altered), 0) # Invalid execution date - altered = set_dag_run_state(self.dag1, None) + altered = set_dag_run_state_to_success(self.dag1, None) + self.assertEqual(len(altered), 0) + altered = set_dag_run_state_to_failed(self.dag1, None) + self.assertEqual(len(altered), 0) + altered = set_dag_run_state_to_running(self.dag1, None) self.assertEqual(len(altered), 0) - self.assertRaises(AssertionError, set_dag_run_state, self.dag1, timedelta(microseconds=-1)) + # This will throw AssertionError since dag.latest_execution_date + # need to be 0 does not exist. + self.assertRaises(AssertionError, set_dag_run_state_to_success, self.dag1, + timezone.make_naive(self.execution_dates[0])) + + # altered = set_dag_run_state_to_success(self.dag1, self.execution_dates[0]) # DagRun does not exist # This will throw AssertionError since dag.latest_execution_date does not exist - self.assertRaises(AssertionError, set_dag_run_state, self.dag1, self.execution_dates[0]) + self.assertRaises(AssertionError, set_dag_run_state_to_success, + self.dag1, self.execution_dates[0]) def tearDown(self): self.dag1.clear() @@ -397,5 +483,6 @@ class TestMarkDAGRun(unittest.TestCase): self.session.query(models.DagStat).delete() self.session.commit() + if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/tests/dags/test_example_bash_operator.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_example_bash_operator.py b/tests/dags/test_example_bash_operator.py index 94b89ff..f9bd6c7 100644 --- a/tests/dags/test_example_bash_operator.py +++ b/tests/dags/test_example_bash_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,6 +27,7 @@ from datetime import timedelta args = { 'owner': 'airflow', + 'retries': 3, 'start_date': airflow.utils.dates.days_ago(2) } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/tests/www_rbac/test_views.py ---------------------------------------------------------------------- diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py index 71dcab0..0c25972 100644 --- a/tests/www_rbac/test_views.py +++ b/tests/www_rbac/test_views.py @@ -380,8 +380,14 @@ class TestAirflowBaseViews(TestBase): resp = self.client.post(url, follow_redirects=True) self.check_content_in_response('OK', resp) - def test_success(self): + def test_failed(self): + url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&' + 'execution_date={}&upstream=false&downstream=false&future=false&past=false' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url) + self.check_content_in_response('Wait a minute', resp) + def test_success(self): url = ('success?task_id=run_this_last&dag_id=example_bash_operator&' 'execution_date={}&upstream=false&downstream=false&future=false&past=false' .format(self.percent_encode(self.default_date)))