Repository: incubator-airflow
Updated Branches:
  refs/heads/master b0061f136 -> 284dbdb60


[AIRFLOW-2359] Add set failed for DagRun and task in tree view

Closes #3255 from
yrqls21/kevin_yang_add_set_failed


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/284dbdb6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/284dbdb6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/284dbdb6

Branch: refs/heads/master
Commit: 284dbdb60ab1fec027dea4871e3013a4727f6041
Parents: b0061f1
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Thu Jun 28 13:30:36 2018 -0700
Committer: Alex Guziel <alex.guz...@airbnb.com>
Committed: Thu Jun 28 13:30:36 2018 -0700

----------------------------------------------------------------------
 airflow/api/common/experimental/mark_tasks.py | 113 +++++++-
 airflow/jobs.py                               |   3 +-
 airflow/models.py                             |   4 +-
 airflow/www/templates/airflow/dag.html        |  43 +++
 airflow/www/views.py                          | 295 +++++++++++++++------
 airflow/www_rbac/templates/airflow/dag.html   |  43 +++
 airflow/www_rbac/views.py                     | 237 ++++++++++++-----
 docs/scheduler.rst                            |   1 +
 tests/api/common/experimental/mark_tasks.py   | 275 ++++++++++++-------
 tests/dags/test_example_bash_operator.py      |   5 +-
 tests/www_rbac/test_views.py                  |   8 +-
 11 files changed, 765 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/api/common/experimental/mark_tasks.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/mark_tasks.py 
b/airflow/api/common/experimental/mark_tasks.py
index e9e4fec..681864d 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -17,15 +17,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from sqlalchemy import or_
+
 from airflow.jobs import BackfillJob
 from airflow.models import DagRun, TaskInstance
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.settings import Session
 from airflow.utils import timezone
+from airflow.utils.db import provide_session
 from airflow.utils.state import State
 
-from sqlalchemy import or_
-
 
 def _create_dagruns(dag, execution_dates, state, run_id_template):
     """
@@ -191,15 +192,36 @@ def set_state(task, execution_date, upstream=False, 
downstream=False,
     return tis_altered
 
 
-def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
+def _set_dag_run_state(dag_id, execution_date, state, session=None):
+    """
+    Helper method that set dag run state in the DB.
+    :param dag_id: dag_id of target dag run
+    :param execution_date: the execution date from which to start looking
+    :param state: target state
+    :param session: database session
     """
-    Set the state of a dag run and all task instances associated with the dag
-    run for a specific execution date.
+    DR = DagRun
+    dr = session.query(DR).filter(
+        DR.dag_id == dag_id,
+        DR.execution_date == execution_date
+    ).one()
+    dr.state = state
+    dr.end_date = timezone.utcnow()
+    session.commit()
+
+
+@provide_session
+def set_dag_run_state_to_success(dag, execution_date, commit=False,
+                                 session=None):
+    """
+    Set the dag run for a specific execution date and its task instances
+    to success.
     :param dag: the DAG of which to alter state
     :param execution_date: the execution date from which to start looking
-    :param state: the state to which the DAG need to be set
     :param commit: commit DAG and tasks to be altered to the database
-    :return: list of tasks that have been created and updated
+    :param session: database session
+    :return: If commit is true, list of tasks that have been updated,
+             otherwise list of tasks that will be updated
     :raises: AssertionError if dag or execution_date is invalid
     """
     res = []
@@ -207,18 +229,81 @@ def set_dag_run_state(dag, execution_date, 
state=State.SUCCESS, commit=False):
     if not dag or not execution_date:
         return res
 
-    # Mark all task instances in the dag run
+    # Mark the dag run to success.
+    if commit:
+        _set_dag_run_state(dag.dag_id, execution_date, State.SUCCESS, session)
+
+    # Mark all task instances of the dag run to success.
     for task in dag.tasks:
         task.dag = dag
         new_state = set_state(task=task, execution_date=execution_date,
-                              state=state, commit=commit)
+                              state=State.SUCCESS, commit=commit)
         res.extend(new_state)
 
-    # Mark the dag run
+    return res
+
+
+@provide_session
+def set_dag_run_state_to_failed(dag, execution_date, commit=False,
+                                session=None):
+    """
+    Set the dag run for a specific execution date and its running task 
instances
+    to failed.
+    :param dag: the DAG of which to alter state
+    :param execution_date: the execution date from which to start looking
+    :param commit: commit DAG and tasks to be altered to the database
+    :param session: database session
+    :return: If commit is true, list of tasks that have been updated,
+             otherwise list of tasks that will be updated
+    :raises: AssertionError if dag or execution_date is invalid
+    """
+    res = []
+
+    if not dag or not execution_date:
+        return res
+
+    # Mark the dag run to failed.
+    if commit:
+        _set_dag_run_state(dag.dag_id, execution_date, State.FAILED, session)
+
+    # Mark only RUNNING task instances.
+    TI = TaskInstance
+    task_ids = [task.task_id for task in dag.tasks]
+    tis = session.query(TI).filter(
+        TI.dag_id == dag.dag_id,
+        TI.execution_date == execution_date,
+        TI.task_id.in_(task_ids)).filter(TI.state == State.RUNNING)
+    task_ids_of_running_tis = [ti.task_id for ti in tis]
+    for task in dag.tasks:
+        if task.task_id not in task_ids_of_running_tis:
+            continue
+        task.dag = dag
+        new_state = set_state(task=task, execution_date=execution_date,
+                              state=State.FAILED, commit=commit)
+        res.extend(new_state)
+
+    return res
+
+
+@provide_session
+def set_dag_run_state_to_running(dag, execution_date, commit=False,
+                                 session=None):
+    """
+    Set the dag run for a specific execution date to running.
+    :param dag: the DAG of which to alter state
+    :param execution_date: the execution date from which to start looking
+    :param commit: commit DAG and tasks to be altered to the database
+    :param session: database session
+    :return: If commit is true, list of tasks that have been updated,
+             otherwise list of tasks that will be updated
+    """
+    res = []
+    if not dag or not execution_date:
+        return res
+
+    # Mark the dag run to running.
     if commit:
-        drs = DagRun.find(dag.dag_id, execution_date=execution_date)
-        for dr in drs:
-            dr.dag = dag
-            dr.update_state()
+        _set_dag_run_state(dag.dag_id, execution_date, State.RUNNING, session)
 
+    # To keep the return type consistent with the other similar functions.
     return res

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 70891ab..00ede54 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1023,7 +1023,8 @@ class SchedulerJob(BaseJob):
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
-                    subq.c.execution_date)) \
+                    subq.c.execution_date,
+                    models.TaskInstance.task_id == subq.c.task_id)) \
                 .update({models.TaskInstance.state: new_state},
                         synchronize_session=False)
             session.commit()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 260c0ba..089befe 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1678,9 +1678,9 @@ class TaskInstance(Base, LoggingMixin):
             self.state = State.SKIPPED
         except AirflowException as e:
             self.refresh_from_db()
-            # for case when task is marked as success externally
+            # for case when task is marked as success/failed externally
             # current behavior doesn't hit the success callback
-            if self.state == State.SUCCESS:
+            if self.state in {State.SUCCESS, State.FAILED}:
                 return
             else:
                 self.handle_failure(e, test_mode, context)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/www/templates/airflow/dag.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/dag.html 
b/airflow/www/templates/airflow/dag.html
index 9ef4cce..b4c09de 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -197,6 +197,24 @@
             </button>
           </span>
           <hr/>
+          <button id="btn_failed" type="button" class="btn btn-primary">
+            Mark Failed
+          </button>
+          <span class="btn-group">
+            <button id="btn_failed_past"
+              type="button" class="btn" data-toggle="button">Past</button>
+            <button id="btn_failed_future"
+              type="button" class="btn" data-toggle="button">
+              Future
+            </button>
+            <button id="btn_failed_upstream"
+              type="button" class="btn" data-toggle="button">Upstream</button>
+            <button id="btn_failed_downstream"
+              type="button" class="btn" data-toggle="button">
+              Downstream
+            </button>
+          </span>
+          <hr/>
           <button id="btn_success" type="button" class="btn btn-primary">
             Mark Success
           </button>
@@ -241,6 +259,9 @@
           <button id="btn_dagrun_clear" type="button" class="btn btn-primary">
             Clear
           </button>
+          <button id="btn_dagrun_failed" type="button" class="btn btn-primary">
+            Mark Failed
+          </button>
           <button id="btn_dagrun_success" type="button" class="btn 
btn-primary">
             Mark Success
           </button>
@@ -389,6 +410,20 @@ function updateQueryStringParameter(uri, key, value) {
       window.location = url;
     });
 
+    $("#btn_failed").click(function(){
+      url = "{{ url_for('airflow.failed') }}" +
+        "?task_id=" + encodeURIComponent(task_id) +
+        "&dag_id=" + encodeURIComponent(dag_id) +
+        "&upstream=" + $('#btn_failed_upstream').hasClass('active') +
+        "&downstream=" + $('#btn_failed_downstream').hasClass('active') +
+        "&future=" + $('#btn_failed_future').hasClass('active') +
+        "&past=" + $('#btn_failed_past').hasClass('active') +
+        "&execution_date=" + encodeURIComponent(execution_date) +
+        "&origin=" + encodeURIComponent(window.location);
+
+      window.location = url;
+    });
+
     $("#btn_success").click(function(){
       url = "{{ url_for('airflow.success') }}" +
         "?task_id=" + encodeURIComponent(task_id) +
@@ -403,6 +438,14 @@ function updateQueryStringParameter(uri, key, value) {
       window.location = url;
     });
 
+    $('#btn_dagrun_failed').click(function(){
+      url = "{{ url_for('airflow.dagrun_failed') }}" +
+        "?dag_id=" + encodeURIComponent(dag_id) +
+        "&execution_date=" + encodeURIComponent(execution_date) +
+        "&origin=" + encodeURIComponent(window.location);
+      window.location = url;
+    });
+
     $('#btn_dagrun_success').click(function(){
       url = "{{ url_for('airflow.dagrun_success') }}" +
         "?dag_id=" + encodeURIComponent(dag_id) +

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5c0c973..d37c0db 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -18,77 +18,69 @@
 # under the License.
 #
 
-from past.builtins import basestring, unicode
-
 import ast
+import codecs
+import copy
 import datetime as dt
+import inspect
+import itertools
+import json
 import logging
-import os
-import pkg_resources
-import socket
-from functools import wraps
-from datetime import timedelta
-import copy
 import math
-import json
-import bleach
-import pendulum
-import codecs
+import os
+import traceback
 from collections import defaultdict
-import itertools
-
-import inspect
+from datetime import timedelta
+from functools import wraps
 from textwrap import dedent
-import traceback
 
+import bleach
+import markdown
+import nvd3
+import pendulum
+import pkg_resources
 import sqlalchemy as sqla
-from sqlalchemy import or_, desc, and_, union_all
-
 from flask import (
     abort, jsonify, redirect, url_for, request, Markup, Response,
     current_app, render_template, make_response)
+from flask import flash
+from flask._compat import PY2
 from flask_admin import BaseView, expose, AdminIndexView
-from flask_admin.contrib.sqla import ModelView
 from flask_admin.actions import action
 from flask_admin.babel import lazy_gettext
+from flask_admin.contrib.sqla import ModelView
+from flask_admin.form.fields import DateTimeField
 from flask_admin.tools import iterdecode
-from flask import flash
-from flask._compat import PY2
-
-from jinja2.sandbox import ImmutableSandboxedEnvironment
 from jinja2 import escape
-
-import markdown
-import nvd3
-
+from jinja2.sandbox import ImmutableSandboxedEnvironment
+from past.builtins import basestring, unicode
+from pygments import highlight, lexers
+from pygments.formatters import HtmlFormatter
+from sqlalchemy import or_, desc, and_, union_all
 from wtforms import (
     Form, SelectField, TextAreaField, PasswordField,
     StringField, validators)
-from flask_admin.form.fields import DateTimeField
-
-from pygments import highlight, lexers
-from pygments.formatters import HtmlFormatter
 
 import airflow
 from airflow import configuration as conf
 from airflow import models
 from airflow import settings
-from airflow.api.common.experimental.mark_tasks import set_dag_run_state
+from airflow.api.common.experimental.mark_tasks import 
(set_dag_run_state_to_running,
+                                                        
set_dag_run_state_to_success,
+                                                        
set_dag_run_state_to_failed)
 from airflow.exceptions import AirflowException
-from airflow.models import XCom, DagRun
-from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
-
 from airflow.models import BaseOperator
+from airflow.models import XCom, DagRun
 from airflow.operators.subdag_operator import SubDagOperator
-
+from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
 from airflow.utils import timezone
-from airflow.utils.json import json_ser
-from airflow.utils.state import State
+from airflow.utils.dates import infer_time_unit, scale_time_units, 
parse_execution_date
 from airflow.utils.db import create_session, provide_session
 from airflow.utils.helpers import alchemy_to_dict
-from airflow.utils.dates import infer_time_unit, scale_time_units, 
parse_execution_date
-from airflow.utils.timezone import datetime
+from airflow.utils.json import json_ser
 from airflow.utils.net import get_hostname
+from airflow.utils.state import State
+from airflow.utils.timezone import datetime
 from airflow.www import utils as wwwutils
 from airflow.www.forms import (DateTimeForm, DateTimeWithNumRunsForm,
                                DateTimeWithNumRunsWithDagRunsForm)
@@ -1208,16 +1200,35 @@ class Airflow(BaseView):
             })
         return wwwutils.json_response(payload)
 
-    @expose('/dagrun_success')
-    @login_required
-    @wwwutils.action_logging
-    @wwwutils.notify_owner
-    def dagrun_success(self):
-        dag_id = request.args.get('dag_id')
-        execution_date = request.args.get('execution_date')
-        confirmed = request.args.get('confirmed') == 'true'
-        origin = request.args.get('origin')
+    def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, 
origin):
+        if not execution_date:
+            flash('Invalid execution date', 'error')
+            return redirect(origin)
+
+        execution_date = pendulum.parse(execution_date)
+        dag = dagbag.get_dag(dag_id)
+
+        if not dag:
+            flash('Cannot find DAG: {}'.format(dag_id), 'error')
+            return redirect(origin)
+
+        new_dag_state = set_dag_run_state_to_failed(dag, execution_date, 
commit=confirmed)
+
+        if confirmed:
+            flash('Marked failed on {} task 
instances'.format(len(new_dag_state)))
+            return redirect(origin)
+
+        else:
+            details = '\n'.join([str(t) for t in new_dag_state])
+
+            response = self.render('airflow/confirm.html',
+                                   message=("Here's the list of task instances 
you are "
+                                            "about to mark as failed"),
+                                   details=details)
+
+            return response
 
+    def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, 
origin):
         if not execution_date:
             flash('Invalid execution date', 'error')
             return redirect(origin)
@@ -1229,8 +1240,8 @@ class Airflow(BaseView):
             flash('Cannot find DAG: {}'.format(dag_id), 'error')
             return redirect(origin)
 
-        new_dag_state = set_dag_run_state(dag, execution_date, 
state=State.SUCCESS,
-                                          commit=confirmed)
+        new_dag_state = set_dag_run_state_to_success(dag, execution_date,
+                                                     commit=confirmed)
 
         if confirmed:
             flash('Marked success on {} task 
instances'.format(len(new_dag_state)))
@@ -1241,30 +1252,43 @@ class Airflow(BaseView):
 
             response = self.render('airflow/confirm.html',
                                    message=("Here's the list of task instances 
you are "
-                                            "about to mark as successful:"),
+                                            "about to mark as success"),
                                    details=details)
 
             return response
 
-    @expose('/success')
+    @expose('/dagrun_failed')
     @login_required
     @wwwutils.action_logging
     @wwwutils.notify_owner
-    def success(self):
+    def dagrun_failed(self):
         dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        confirmed = request.args.get('confirmed') == 'true'
+        origin = request.args.get('origin')
+        return self._mark_dagrun_state_as_failed(dag_id, execution_date,
+                                                 confirmed, origin)
+
+    @expose('/dagrun_success')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def dagrun_success(self):
+        dag_id = request.args.get('dag_id')
+        execution_date = request.args.get('execution_date')
+        confirmed = request.args.get('confirmed') == 'true'
         origin = request.args.get('origin')
+        return self._mark_dagrun_state_as_success(dag_id, execution_date,
+                                                  confirmed, origin)
+
+    def _mark_task_instance_state(self, dag_id, task_id, origin, 
execution_date,
+                                  confirmed, upstream, downstream,
+                                  future, past, state):
         dag = dagbag.get_dag(dag_id)
         task = dag.get_task(task_id)
         task.dag = dag
 
-        execution_date = request.args.get('execution_date')
         execution_date = pendulum.parse(execution_date)
-        confirmed = request.args.get('confirmed') == "true"
-        upstream = request.args.get('upstream') == "true"
-        downstream = request.args.get('downstream') == "true"
-        future = request.args.get('future') == "true"
-        past = request.args.get('past') == "true"
 
         if not dag:
             flash("Cannot find DAG: {}".format(dag_id))
@@ -1279,26 +1303,66 @@ class Airflow(BaseView):
         if confirmed:
             altered = set_state(task=task, execution_date=execution_date,
                                 upstream=upstream, downstream=downstream,
-                                future=future, past=past, state=State.SUCCESS,
+                                future=future, past=past, state=state,
                                 commit=True)
 
-            flash("Marked success on {} task instances".format(len(altered)))
+            flash("Marked {} on {} task instances".format(state, len(altered)))
             return redirect(origin)
 
         to_be_altered = set_state(task=task, execution_date=execution_date,
                                   upstream=upstream, downstream=downstream,
-                                  future=future, past=past, 
state=State.SUCCESS,
+                                  future=future, past=past, state=state,
                                   commit=False)
 
         details = "\n".join([str(t) for t in to_be_altered])
 
         response = self.render("airflow/confirm.html",
                                message=("Here's the list of task instances you 
are "
-                                        "about to mark as successful:"),
+                                        "about to mark as {}:".format(state)),
                                details=details)
 
         return response
 
+    @expose('/failed')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def failed(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        execution_date = request.args.get('execution_date')
+
+        confirmed = request.args.get('confirmed') == "true"
+        upstream = request.args.get('upstream') == "true"
+        downstream = request.args.get('downstream') == "true"
+        future = request.args.get('future') == "true"
+        past = request.args.get('past') == "true"
+
+        return self._mark_task_instance_state(dag_id, task_id, origin, 
execution_date,
+                                              confirmed, upstream, downstream,
+                                              future, past, State.FAILED)
+
+    @expose('/success')
+    @login_required
+    @wwwutils.action_logging
+    @wwwutils.notify_owner
+    def success(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        execution_date = request.args.get('execution_date')
+
+        confirmed = request.args.get('confirmed') == "true"
+        upstream = request.args.get('upstream') == "true"
+        downstream = request.args.get('downstream') == "true"
+        future = request.args.get('future') == "true"
+        past = request.args.get('past') == "true"
+
+        return self._mark_task_instance_state(dag_id, task_id, origin, 
execution_date,
+                                              confirmed, upstream, downstream,
+                                              future, past, State.SUCCESS)
+
     @expose('/tree')
     @login_required
     @wwwutils.gzipped
@@ -2596,40 +2660,107 @@ class DagRunModelView(ModelViewOnly):
         models.DagStat.update(dirty_ids, dirty_only=False, session=session)
 
     @action('set_running', "Set state to 'running'", None)
-    def action_set_running(self, ids):
-        self.set_dagrun_state(ids, State.RUNNING)
-
-    @action('set_failed', "Set state to 'failed'", None)
-    def action_set_failed(self, ids):
-        self.set_dagrun_state(ids, State.FAILED)
+    @provide_session
+    def action_set_running(self, ids, session=None):
+        try:
+            DR = models.DagRun
+            count = 0
+            dirty_ids = []
+            for dr in session.query(DR).filter(DR.id.in_(ids)).all():
+                dirty_ids.append(dr.dag_id)
+                count += 1
+                dr.state = State.RUNNING
+                dr.start_date = timezone.utcnow()
+            models.DagStat.update(dirty_ids, session=session)
+            flash(
+                "{count} dag runs were set to running".format(**locals()))
+        except Exception as ex:
+            if not self.handle_view_exception(ex):
+                raise Exception("Ooops")
+            flash('Failed to set state', 'error')
 
-    @action('set_success', "Set state to 'success'", None)
-    def action_set_success(self, ids):
-        self.set_dagrun_state(ids, State.SUCCESS)
+    @action('set_failed', "Set state to 'failed'",
+            "All running task instances would also be marked as failed, are 
you sure?")
+    @provide_session
+    def action_set_failed(self, ids, session=None):
+        try:
+            DR = models.DagRun
+            count = 0
+            dirty_ids = []
+            altered_tis = []
+            for dr in session.query(DR).filter(DR.id.in_(ids)).all():
+                dirty_ids.append(dr.dag_id)
+                count += 1
+                altered_tis += \
+                    set_dag_run_state_to_failed(dagbag.get_dag(dr.dag_id),
+                                                dr.execution_date,
+                                                commit=True,
+                                                session=session)
+            models.DagStat.update(dirty_ids, session=session)
+            altered_ti_count = len(altered_tis)
+            flash(
+                "{count} dag runs and {altered_ti_count} task instances "
+                "were set to failed".format(**locals()))
+        except Exception as ex:
+            if not self.handle_view_exception(ex):
+                raise Exception("Ooops")
+            flash('Failed to set state', 'error')
 
+    @action('set_success', "Set state to 'success'",
+            "All task instances would also be marked as success, are you 
sure?")
     @provide_session
-    def set_dagrun_state(self, ids, target_state, session=None):
+    def action_set_success(self, ids, session=None):
         try:
             DR = models.DagRun
             count = 0
             dirty_ids = []
+            altered_tis = []
             for dr in session.query(DR).filter(DR.id.in_(ids)).all():
                 dirty_ids.append(dr.dag_id)
                 count += 1
-                dr.state = target_state
-                if target_state == State.RUNNING:
-                    dr.start_date = timezone.utcnow()
-                else:
-                    dr.end_date = timezone.utcnow()
-            session.commit()
+                altered_tis += \
+                    set_dag_run_state_to_success(dagbag.get_dag(dr.dag_id),
+                                                 dr.execution_date,
+                                                 commit=True,
+                                                 session=session)
             models.DagStat.update(dirty_ids, session=session)
+            altered_ti_count = len(altered_tis)
             flash(
-                "{count} dag runs were set to 
'{target_state}'".format(**locals()))
+                "{count} dag runs and {altered_ti_count} task instances "
+                "were set to success".format(**locals()))
         except Exception as ex:
             if not self.handle_view_exception(ex):
                 raise Exception("Ooops")
             flash('Failed to set state', 'error')
 
+    # Called after editing DagRun model in the UI.
+    @provide_session
+    def after_model_change(self, form, dagrun, is_created, session=None):
+        altered_tis = []
+        if dagrun.state == State.SUCCESS:
+            altered_tis = set_dag_run_state_to_success(
+                dagbag.get_dag(dagrun.dag_id),
+                dagrun.execution_date,
+                commit=True)
+        elif dagrun.state == State.FAILED:
+            altered_tis = set_dag_run_state_to_failed(
+                dagbag.get_dag(dagrun.dag_id),
+                dagrun.execution_date,
+                commit=True,
+                session=session)
+        elif dagrun.state == State.RUNNING:
+            altered_tis = set_dag_run_state_to_running(
+                dagbag.get_dag(dagrun.dag_id),
+                dagrun.execution_date,
+                commit=True,
+                session=session)
+
+        altered_ti_count = len(altered_tis)
+        models.DagStat.update([dagrun.dag_id], session=session)
+        flash(
+            "1 dag run and {altered_ti_count} task instances "
+            "were set to '{dagrun.state}'".format(**locals()))
+
 
 class LogModelView(ModelViewOnly):
     verbose_name_plural = "logs"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/www_rbac/templates/airflow/dag.html
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/templates/airflow/dag.html 
b/airflow/www_rbac/templates/airflow/dag.html
index e6495fe..eb297ad 100644
--- a/airflow/www_rbac/templates/airflow/dag.html
+++ b/airflow/www_rbac/templates/airflow/dag.html
@@ -196,6 +196,24 @@
             </button>
           </span>
           <hr/>
+          <button id="btn_failed" type="button" class="btn btn-primary">
+            Mark Failed
+          </button>
+          <span class="btn-group">
+            <button id="btn_failed_past"
+              type="button" class="btn" data-toggle="button">Past</button>
+            <button id="btn_failed_future"
+              type="button" class="btn" data-toggle="button">
+              Future
+            </button>
+            <button id="btn_failed_upstream"
+              type="button" class="btn" data-toggle="button">Upstream</button>
+            <button id="btn_failed_downstream"
+              type="button" class="btn" data-toggle="button">
+              Downstream
+            </button>
+          </span>
+          <hr/>
           <button id="btn_success" type="button" class="btn btn-primary">
             Mark Success
           </button>
@@ -240,6 +258,9 @@
           <button id="btn_dagrun_clear" type="button" class="btn btn-primary">
             Clear
           </button>
+          <button id="btn_dagrun_failed" type="button" class="btn btn-primary">
+            Mark Failed
+          </button>
           <button id="btn_dagrun_success" type="button" class="btn 
btn-primary">
             Mark Success
           </button>
@@ -387,6 +408,20 @@ function updateQueryStringParameter(uri, key, value) {
       window.location = url;
     });
 
+    $("#btn_failed").click(function(){
+      url = "{{ url_for('Airflow.failed') }}" +
+        "?task_id=" + encodeURIComponent(task_id) +
+        "&dag_id=" + encodeURIComponent(dag_id) +
+        "&upstream=" + $('#btn_failed_upstream').hasClass('active') +
+        "&downstream=" + $('#btn_failed_downstream').hasClass('active') +
+        "&future=" + $('#btn_failed_future').hasClass('active') +
+        "&past=" + $('#btn_failed_past').hasClass('active') +
+        "&execution_date=" + encodeURIComponent(execution_date) +
+        "&origin=" + encodeURIComponent(window.location);
+
+      window.location = url;
+    });
+
     $("#btn_success").click(function(){
       url = "{{ url_for('Airflow.success') }}" +
         "?task_id=" + encodeURIComponent(task_id) +
@@ -401,6 +436,14 @@ function updateQueryStringParameter(uri, key, value) {
       window.location = url;
     });
 
+    $('#btn_dagrun_failed').click(function(){
+      url = "{{ url_for('Airflow.dagrun_failed') }}" +
+        "?dag_id=" + encodeURIComponent(dag_id) +
+        "&execution_date=" + encodeURIComponent(execution_date) +
+        "&origin=" + encodeURIComponent(window.location);
+      window.location = url;
+    });
+
     $('#btn_dagrun_success').click(function(){
       url = "{{ url_for('Airflow.dagrun_success') }}" +
         "?dag_id=" + encodeURIComponent(dag_id) +

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/airflow/www_rbac/views.py
----------------------------------------------------------------------
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index 78f9799..9cbc642 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -18,57 +18,49 @@
 # under the License.
 #
 
-from past.builtins import unicode
-
+import copy
+import itertools
+import json
 import logging
+import math
 import os
 import socket
-from datetime import datetime, timedelta
-import copy
-import math
-import json
-from collections import defaultdict
 import traceback
+from collections import defaultdict
+from datetime import timedelta
+
 import markdown
 import nvd3
 import pendulum
-import itertools
-
 import sqlalchemy as sqla
-from sqlalchemy import or_, desc, and_, union_all
-
 from flask import (
     g, redirect, request, Markup, Response, render_template,
     make_response, flash, jsonify)
 from flask._compat import PY2
-
 from flask_appbuilder import BaseView, ModelView, expose, has_access
-from flask_appbuilder.models.sqla.interface import SQLAInterface
 from flask_appbuilder.actions import action
-
+from flask_appbuilder.models.sqla.interface import SQLAInterface
 from flask_babel import lazy_gettext
-
-from wtforms import Form, SelectField, validators
-
+from past.builtins import unicode
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
+from sqlalchemy import or_, desc, and_, union_all
+from wtforms import SelectField, validators
 
 import airflow
 from airflow import configuration as conf
 from airflow import models, jobs
 from airflow import settings
-from airflow.api.common.experimental.mark_tasks import set_dag_run_state
+from airflow.api.common.experimental.mark_tasks import 
(set_dag_run_state_to_success,
+                                                        
set_dag_run_state_to_failed)
 from airflow.models import XCom, DagRun
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
-
 from airflow.utils import timezone
-from airflow.utils.json import json_ser
-from airflow.utils.state import State
+from airflow.utils.dates import infer_time_unit, scale_time_units
 from airflow.utils.db import provide_session
 from airflow.utils.helpers import alchemy_to_dict
-from airflow.utils.dates import infer_time_unit, scale_time_units
-
-
+from airflow.utils.json import json_ser
+from airflow.utils.state import State
 from airflow.www_rbac import utils as wwwutils
 from airflow.www_rbac.app import app
 from airflow.www_rbac.decorators import action_logging, gzipped
@@ -888,15 +880,35 @@ class Airflow(AirflowBaseView):
             })
         return wwwutils.json_response(payload)
 
-    @expose('/dagrun_success')
-    @has_access
-    @action_logging
-    def dagrun_success(self):
-        dag_id = request.args.get('dag_id')
-        execution_date = request.args.get('execution_date')
-        confirmed = request.args.get('confirmed') == 'true'
-        origin = request.args.get('origin')
+    def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, 
origin):
+        if not execution_date:
+            flash('Invalid execution date', 'error')
+            return redirect(origin)
+
+        execution_date = pendulum.parse(execution_date)
+        dag = dagbag.get_dag(dag_id)
+
+        if not dag:
+            flash('Cannot find DAG: {}'.format(dag_id), 'error')
+            return redirect(origin)
+
+        new_dag_state = set_dag_run_state_to_failed(dag, execution_date, 
commit=confirmed)
+
+        if confirmed:
+            flash('Marked failed on {} task 
instances'.format(len(new_dag_state)))
+            return redirect(origin)
+
+        else:
+            details = '\n'.join([str(t) for t in new_dag_state])
+
+            response = self.render('airflow/confirm.html',
+                                   message=("Here's the list of task instances 
you are "
+                                            "about to mark as failed"),
+                                   details=details)
 
+            return response
+
+    def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, 
origin):
         if not execution_date:
             flash('Invalid execution date', 'error')
             return redirect(origin)
@@ -908,8 +920,8 @@ class Airflow(AirflowBaseView):
             flash('Cannot find DAG: {}'.format(dag_id), 'error')
             return redirect(origin)
 
-        new_dag_state = set_dag_run_state(dag, execution_date, 
state=State.SUCCESS,
-                                          commit=confirmed)
+        new_dag_state = set_dag_run_state_to_success(dag, execution_date,
+                                                     commit=confirmed)
 
         if confirmed:
             flash('Marked success on {} task 
instances'.format(len(new_dag_state)))
@@ -920,29 +932,41 @@ class Airflow(AirflowBaseView):
 
             response = self.render('airflow/confirm.html',
                                    message=("Here's the list of task instances 
you are "
-                                            "about to mark as successful:"),
+                                            "about to mark as success"),
                                    details=details)
 
             return response
 
-    @expose('/success')
+    @expose('/dagrun_failed')
     @has_access
     @action_logging
-    def success(self):
+    def dagrun_failed(self):
         dag_id = request.args.get('dag_id')
-        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        confirmed = request.args.get('confirmed') == 'true'
+        origin = request.args.get('origin')
+        return self._mark_dagrun_state_as_failed(dag_id, execution_date,
+                                                 confirmed, origin)
+
+    @expose('/dagrun_success')
+    @has_access
+    @action_logging
+    def dagrun_success(self):
+        dag_id = request.args.get('dag_id')
+        execution_date = request.args.get('execution_date')
+        confirmed = request.args.get('confirmed') == 'true'
         origin = request.args.get('origin')
+        return self._mark_dagrun_state_as_success(dag_id, execution_date,
+                                                  confirmed, origin)
+
+    def _mark_task_instance_state(self, dag_id, task_id, origin, 
execution_date,
+                                  confirmed, upstream, downstream,
+                                  future, past, state):
         dag = dagbag.get_dag(dag_id)
         task = dag.get_task(task_id)
         task.dag = dag
 
-        execution_date = request.args.get('execution_date')
         execution_date = pendulum.parse(execution_date)
-        confirmed = request.args.get('confirmed') == "true"
-        upstream = request.args.get('upstream') == "true"
-        downstream = request.args.get('downstream') == "true"
-        future = request.args.get('future') == "true"
-        past = request.args.get('past') == "true"
 
         if not dag:
             flash("Cannot find DAG: {}".format(dag_id))
@@ -957,26 +981,64 @@ class Airflow(AirflowBaseView):
         if confirmed:
             altered = set_state(task=task, execution_date=execution_date,
                                 upstream=upstream, downstream=downstream,
-                                future=future, past=past, state=State.SUCCESS,
+                                future=future, past=past, state=state,
                                 commit=True)
 
-            flash("Marked success on {} task instances".format(len(altered)))
+            flash("Marked {} on {} task instances".format(state, len(altered)))
             return redirect(origin)
 
         to_be_altered = set_state(task=task, execution_date=execution_date,
                                   upstream=upstream, downstream=downstream,
-                                  future=future, past=past, 
state=State.SUCCESS,
+                                  future=future, past=past, state=state,
                                   commit=False)
 
         details = "\n".join([str(t) for t in to_be_altered])
 
         response = self.render("airflow/confirm.html",
                                message=("Here's the list of task instances you 
are "
-                                        "about to mark as successful:"),
+                                        "about to mark as {}:".format(state)),
                                details=details)
 
         return response
 
+    @expose('/failed')
+    @has_access
+    @action_logging
+    def failed(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        execution_date = request.args.get('execution_date')
+
+        confirmed = request.args.get('confirmed') == "true"
+        upstream = request.args.get('upstream') == "true"
+        downstream = request.args.get('downstream') == "true"
+        future = request.args.get('future') == "true"
+        past = request.args.get('past') == "true"
+
+        return self._mark_task_instance_state(dag_id, task_id, origin, 
execution_date,
+                                              confirmed, upstream, downstream,
+                                              future, past, State.FAILED)
+
+    @expose('/success')
+    @has_access
+    @action_logging
+    def success(self):
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        origin = request.args.get('origin')
+        execution_date = request.args.get('execution_date')
+
+        confirmed = request.args.get('confirmed') == "true"
+        upstream = request.args.get('upstream') == "true"
+        downstream = request.args.get('downstream') == "true"
+        future = request.args.get('future') == "true"
+        past = request.args.get('past') == "true"
+
+        return self._mark_task_instance_state(dag_id, task_id, origin, 
execution_date,
+                                              confirmed, upstream, downstream,
+                                              future, past, State.SUCCESS)
+
     @expose('/tree')
     @has_access
     @gzipped
@@ -1972,43 +2034,86 @@ class DagRunModelView(AirflowModelView):
     def action_muldelete(self, items, session=None):
         self.datamodel.delete_all(items)
         self.update_redirect()
-        return redirect(self.get_redirect())
         dirty_ids = []
         for item in items:
             dirty_ids.append(item.dag_id)
         models.DagStat.update(dirty_ids, dirty_only=False, session=session)
+        return redirect(self.get_redirect())
 
     @action('set_running', "Set state to 'running'", '', single=False)
-    def action_set_running(self, drs):
-        return self.set_dagrun_state(drs, State.RUNNING)
-
-    @action('set_failed', "Set state to 'failed'", '', single=False)
-    def action_set_failed(self, drs):
-        return self.set_dagrun_state(drs, State.FAILED)
+    @provide_session
+    def action_set_running(self, drs, session=None):
+        try:
+            DR = models.DagRun
+            count = 0
+            dirty_ids = []
+            for dr in session.query(DR).filter(
+                    DR.id.in_([dagrun.id for dagrun in drs])).all():
+                dirty_ids.append(dr.dag_id)
+                count += 1
+                dr.start_date = timezone.utcnow()
+                dr.state = State.RUNNING
+            models.DagStat.update(dirty_ids, session=session)
+            session.commit()
+            flash(
+                "{count} dag runs were set to running".format(**locals()))
+        except Exception as ex:
+            flash(str(ex))
+            flash('Failed to set state', 'error')
+        return redirect(self.route_base + '/list')
 
-    @action('set_success', "Set state to 'success'", '', single=False)
-    def action_set_success(self, drs):
-        return self.set_dagrun_state(drs, State.SUCCESS)
+    @action('set_failed', "Set state to 'failed'",
+            "All running task instances would also be marked as failed, are 
you sure?",
+            single=False)
+    @provide_session
+    def action_set_failed(self, drs, session=None):
+        try:
+            DR = models.DagRun
+            count = 0
+            dirty_ids = []
+            altered_tis = []
+            for dr in session.query(DR).filter(
+                    DR.id.in_([dagrun.id for dagrun in drs])).all():
+                dirty_ids.append(dr.dag_id)
+                count += 1
+                altered_tis += \
+                    set_dag_run_state_to_failed(dagbag.get_dag(dr.dag_id),
+                                                dr.execution_date,
+                                                commit=True,
+                                                session=session)
+            models.DagStat.update(dirty_ids, session=session)
+            altered_ti_count = len(altered_tis)
+            flash(
+                "{count} dag runs and {altered_ti_count} task instances "
+                "were set to failed".format(**locals()))
+        except Exception as ex:
+            flash('Failed to set state', 'error')
+        return redirect(self.route_base + '/list')
 
+    @action('set_success', "Set state to 'success'",
+            "All task instances would also be marked as success, are you 
sure?",
+            single=False)
     @provide_session
-    def set_dagrun_state(self, drs, target_state, session=None):
+    def action_set_success(self, drs, session=None):
         try:
             DR = models.DagRun
             count = 0
             dirty_ids = []
+            altered_tis = []
             for dr in session.query(DR).filter(
                     DR.id.in_([dagrun.id for dagrun in drs])).all():
                 dirty_ids.append(dr.dag_id)
                 count += 1
-                dr.state = target_state
-                if target_state == State.RUNNING:
-                    dr.start_date = timezone.utcnow()
-                else:
-                    dr.end_date = timezone.utcnow()
-            session.commit()
+                altered_tis += \
+                    set_dag_run_state_to_success(dagbag.get_dag(dr.dag_id),
+                                                 dr.execution_date,
+                                                 commit=True,
+                                                 session=session)
             models.DagStat.update(dirty_ids, session=session)
+            altered_ti_count = len(altered_tis)
             flash(
-                "{count} dag runs were set to 
'{target_state}'".format(**locals()))
+                "{count} dag runs and {altered_ti_count} task instances "
+                "were set to success".format(**locals()))
         except Exception as ex:
             flash('Failed to set state', 'error')
         return redirect(self.route_base + '/list')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/docs/scheduler.rst
----------------------------------------------------------------------
diff --git a/docs/scheduler.rst b/docs/scheduler.rst
index dfa0a42..da5fc70 100644
--- a/docs/scheduler.rst
+++ b/docs/scheduler.rst
@@ -158,6 +158,7 @@ Here are some of the ways you can **unblock tasks**:
   states (``failed``, or ``success``)
 * Clearing a task instance will no longer delete the task instance record. 
Instead it updates
   max_tries and set the current task instance state to be None.
+* Marking task instances as failed can be done through the UI. This can be 
used to stop running task instances.
 * Marking task instances as successful can be done through the UI. This is 
mostly to fix false negatives,
   or for instance when the fix has been applied outside of Airflow.
 * The ``airflow backfill`` CLI subcommand has a flag to ``--mark_success`` and 
allows selecting

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/tests/api/common/experimental/mark_tasks.py
----------------------------------------------------------------------
diff --git a/tests/api/common/experimental/mark_tasks.py 
b/tests/api/common/experimental/mark_tasks.py
index 3ad5053..181d10d 100644
--- a/tests/api/common/experimental/mark_tasks.py
+++ b/tests/api/common/experimental/mark_tasks.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,14 +18,16 @@
 # under the License.
 
 import unittest
+from datetime import datetime
 
 from airflow import models
 from airflow.api.common.experimental.mark_tasks import (
-    set_state, _create_dagruns, set_dag_run_state)
+    set_state, _create_dagruns, set_dag_run_state_to_success, 
set_dag_run_state_to_failed,
+    set_dag_run_state_to_running)
 from airflow.settings import Session
+from airflow.utils import timezone
 from airflow.utils.dates import days_ago
 from airflow.utils.state import State
-from datetime import datetime, timedelta
 
 DEV_NULL = "/dev/null"
 
@@ -223,115 +225,183 @@ class TestMarkDAGRun(unittest.TestCase):
 
         self.session = Session()
 
-    def verify_dag_run_states(self, dag, date, state=State.SUCCESS):
-        drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
-        dr = drs[0]
-        self.assertEqual(dr.get_state(), state)
-        tis = dr.get_task_instances(session=self.session)
+    def _set_default_task_instance_states(self, dr):
+        if dr.dag_id != 'test_example_bash_operator':
+            return
+        # success task
+        dr.get_task_instance('runme_0').set_state(State.SUCCESS, self.session)
+        # skipped task
+        dr.get_task_instance('runme_1').set_state(State.SKIPPED, self.session)
+        # retry task
+        dr.get_task_instance('runme_2').set_state(State.UP_FOR_RETRY, 
self.session)
+        # queued task
+        dr.get_task_instance('also_run_this').set_state(State.QUEUED, 
self.session)
+        # running task
+        dr.get_task_instance('run_after_loop').set_state(State.RUNNING, 
self.session)
+        # failed task
+        dr.get_task_instance('run_this_last').set_state(State.FAILED, 
self.session)
+
+    def _verify_task_instance_states_remain_default(self, dr):
+        self.assertEqual(dr.get_task_instance('runme_0').state, State.SUCCESS)
+        self.assertEqual(dr.get_task_instance('runme_1').state, State.SKIPPED)
+        self.assertEqual(dr.get_task_instance('runme_2').state, 
State.UP_FOR_RETRY)
+        self.assertEqual(dr.get_task_instance('also_run_this').state, 
State.QUEUED, )
+        self.assertEqual(dr.get_task_instance('run_after_loop').state, 
State.RUNNING)
+        self.assertEqual(dr.get_task_instance('run_this_last').state, 
State.FAILED)
+
+    def _verify_task_instance_states(self, dag, date, state):
+        TI = models.TaskInstance
+        tis = self.session.query(TI).filter(TI.dag_id == dag.dag_id,
+                                            TI.execution_date == date)
         for ti in tis:
             self.assertEqual(ti.state, state)
 
-    def test_set_running_dag_run_state(self):
-        date = self.execution_dates[0]
-        dr = self.dag1.create_dagrun(
+    def _create_test_dag_run(self, state, date):
+        return self.dag1.create_dagrun(
             run_id='manual__' + datetime.now().isoformat(),
-            state=State.RUNNING,
+            state=state,
             execution_date=date,
             session=self.session
         )
-        for ti in dr.get_task_instances(session=self.session):
-            ti.set_state(State.RUNNING, self.session)
 
-        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, 
commit=True)
+    def _verify_dag_run_state(self, dag, date, state):
+        drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
+        dr = drs[0]
+        self.assertEqual(dr.get_state(), state)
+
+    def test_set_running_dag_run_to_success(self):
+        date = self.execution_dates[0]
+        dr = self._create_test_dag_run(State.RUNNING, date)
+        self._set_default_task_instance_states(dr)
+
+        altered = set_dag_run_state_to_success(self.dag1, date, commit=True)
 
-        # All of the task should be altered
-        self.assertEqual(len(altered), len(self.dag1.tasks))
-        self.verify_dag_run_states(self.dag1, date)
+        # All except the SUCCESS task should be altered.
+        self.assertEqual(len(altered), 5)
+        self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
+        self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
 
-    def test_set_success_dag_run_state(self):
+    def test_set_running_dag_run_to_failed(self):
         date = self.execution_dates[0]
+        dr = self._create_test_dag_run(State.RUNNING, date)
+        self._set_default_task_instance_states(dr)
 
-        dr = self.dag1.create_dagrun(
-            run_id='manual__' + datetime.now().isoformat(),
-            state=State.SUCCESS,
-            execution_date=date,
-            session=self.session
-        )
-        for ti in dr.get_task_instances(session=self.session):
-            ti.set_state(State.SUCCESS, self.session)
+        altered = set_dag_run_state_to_failed(self.dag1, date, commit=True)
 
-        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, 
commit=True)
+        # Only running task should be altered.
+        self.assertEqual(len(altered), 1)
+        self._verify_dag_run_state(self.dag1, date, State.FAILED)
+        self.assertEqual(dr.get_task_instance('run_after_loop').state, 
State.FAILED)
+
+    def test_set_running_dag_run_to_running(self):
+        date = self.execution_dates[0]
+        dr = self._create_test_dag_run(State.RUNNING, date)
+        self._set_default_task_instance_states(dr)
 
-        # None of the task should be altered
+        altered = set_dag_run_state_to_running(self.dag1, date, commit=True)
+
+        # None of the tasks should be altered.
         self.assertEqual(len(altered), 0)
-        self.verify_dag_run_states(self.dag1, date)
+        self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+        self._verify_task_instance_states_remain_default(dr)
 
-    def test_set_failed_dag_run_state(self):
+    def test_set_success_dag_run_to_success(self):
         date = self.execution_dates[0]
-        dr = self.dag1.create_dagrun(
-            run_id='manual__' + datetime.now().isoformat(),
-            state=State.FAILED,
-            execution_date=date,
-            session=self.session
-        )
-        dr.get_task_instance('runme_0').set_state(State.FAILED, self.session)
+        dr = self._create_test_dag_run(State.SUCCESS, date)
+        self._set_default_task_instance_states(dr)
 
-        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, 
commit=True)
+        altered = set_dag_run_state_to_success(self.dag1, date, commit=True)
 
-        # All of the task should be altered
-        self.assertEqual(len(altered), len(self.dag1.tasks))
-        self.verify_dag_run_states(self.dag1, date)
+        # All except the SUCCESS task should be altered.
+        self.assertEqual(len(altered), 5)
+        self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
+        self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
 
-    def test_set_mixed_dag_run_state(self):
-        """
-        This test checks function set_dag_run_state with mixed task instance
-        state.
-        """
+    def test_set_success_dag_run_to_failed(self):
         date = self.execution_dates[0]
-        dr = self.dag1.create_dagrun(
-            run_id='manual__' + datetime.now().isoformat(),
-            state=State.FAILED,
-            execution_date=date,
-            session=self.session
-        )
-        # success task
-        dr.get_task_instance('runme_0').set_state(State.SUCCESS, self.session)
-        # skipped task
-        dr.get_task_instance('runme_1').set_state(State.SKIPPED, self.session)
-        # retry task
-        dr.get_task_instance('runme_2').set_state(State.UP_FOR_RETRY, 
self.session)
-        # queued task
-        dr.get_task_instance('also_run_this').set_state(State.QUEUED, 
self.session)
-        # running task
-        dr.get_task_instance('run_after_loop').set_state(State.RUNNING, 
self.session)
-        # failed task
-        dr.get_task_instance('run_this_last').set_state(State.FAILED, 
self.session)
+        dr = self._create_test_dag_run(State.SUCCESS, date)
+        self._set_default_task_instance_states(dr)
 
-        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, 
commit=True)
+        altered = set_dag_run_state_to_failed(self.dag1, date, commit=True)
 
-        self.assertEqual(len(altered), len(self.dag1.tasks) - 1) # only 1 task 
succeeded
-        self.verify_dag_run_states(self.dag1, date)
+        # Only running task should be altered.
+        self.assertEqual(len(altered), 1)
+        self._verify_dag_run_state(self.dag1, date, State.FAILED)
+        self.assertEqual(dr.get_task_instance('run_after_loop').state, 
State.FAILED)
+
+    def test_set_success_dag_run_to_running(self):
+        date = self.execution_dates[0]
+        dr = self._create_test_dag_run(State.SUCCESS, date)
+        self._set_default_task_instance_states(dr)
+
+        altered = set_dag_run_state_to_running(self.dag1, date, commit=True)
+
+        # None of the tasks should be altered.
+        self.assertEqual(len(altered), 0)
+        self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+        self._verify_task_instance_states_remain_default(dr)
+
+    def test_set_failed_dag_run_to_success(self):
+        date = self.execution_dates[0]
+        dr = self._create_test_dag_run(State.SUCCESS, date)
+        self._set_default_task_instance_states(dr)
+
+        altered = set_dag_run_state_to_success(self.dag1, date, commit=True)
+
+        # All except the SUCCESS task should be altered.
+        self.assertEqual(len(altered), 5)
+        self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
+        self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
+
+    def test_set_failed_dag_run_to_failed(self):
+        date = self.execution_dates[0]
+        dr = self._create_test_dag_run(State.SUCCESS, date)
+        self._set_default_task_instance_states(dr)
+
+        altered = set_dag_run_state_to_failed(self.dag1, date, commit=True)
+
+        # Only running task should be altered.
+        self.assertEqual(len(altered), 1)
+        self._verify_dag_run_state(self.dag1, date, State.FAILED)
+        self.assertEqual(dr.get_task_instance('run_after_loop').state, 
State.FAILED)
+
+    def test_set_failed_dag_run_to_running(self):
+        date = self.execution_dates[0]
+        dr = self._create_test_dag_run(State.SUCCESS, date)
+        self._set_default_task_instance_states(dr)
+
+        altered = set_dag_run_state_to_running(self.dag1, date, commit=True)
+
+        # None of the tasks should be altered.
+        self.assertEqual(len(altered), 0)
+        self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+        self._verify_task_instance_states_remain_default(dr)
 
     def test_set_state_without_commit(self):
         date = self.execution_dates[0]
+        dr = self._create_test_dag_run(State.RUNNING, date)
+        self._set_default_task_instance_states(dr)
 
-        # Running dag run and task instances
-        dr = self.dag1.create_dagrun(
-            run_id='manual__' + datetime.now().isoformat(),
-            state=State.RUNNING,
-            execution_date=date,
-            session=self.session
-        )
-        for ti in dr.get_task_instances(session=self.session):
-            ti.set_state(State.RUNNING, self.session)
+        will_be_altered = set_dag_run_state_to_running(self.dag1, date, 
commit=False)
 
-        altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, 
commit=False)
+        # None of the tasks will be altered.
+        self.assertEqual(len(will_be_altered), 0)
+        self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+        self._verify_task_instance_states_remain_default(dr)
 
-        # All of the task should be altered
-        self.assertEqual(len(altered), len(self.dag1.tasks))
+        will_be_altered = set_dag_run_state_to_failed(self.dag1, date, 
commit=False)
 
-        # Both dag run and task instances' states should remain the same
-        self.verify_dag_run_states(self.dag1, date, State.RUNNING)
+        # Only the running task will be altered.
+        self.assertEqual(len(will_be_altered), 1)
+        self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+        self._verify_task_instance_states_remain_default(dr)
+
+        will_be_altered = set_dag_run_state_to_success(self.dag1, date, 
commit=False)
+
+        # All except the SUCCESS task should be altered.
+        self.assertEqual(len(will_be_altered), 5)
+        self._verify_dag_run_state(self.dag1, date, State.RUNNING)
+        self._verify_task_instance_states_remain_default(dr)
 
     def test_set_state_with_multiple_dagruns(self):
         dr1 = self.dag2.create_dagrun(
@@ -353,8 +423,8 @@ class TestMarkDAGRun(unittest.TestCase):
             session=self.session
         )
 
-        altered = set_dag_run_state(self.dag2, self.execution_dates[1],
-                                state=State.SUCCESS, commit=True)
+        altered = set_dag_run_state_to_success(self.dag2, 
self.execution_dates[1],
+                                               commit=True)
 
         # Recursively count number of tasks in the dag
         def count_dag_tasks(dag):
@@ -364,29 +434,45 @@ class TestMarkDAGRun(unittest.TestCase):
             return count
 
         self.assertEqual(len(altered), count_dag_tasks(self.dag2))
-        self.verify_dag_run_states(self.dag2, self.execution_dates[1])
+        self._verify_dag_run_state(self.dag2, self.execution_dates[1], 
State.SUCCESS)
 
         # Make sure other dag status are not changed
-        dr1 = models.DagRun.find(dag_id=self.dag2.dag_id, 
execution_date=self.execution_dates[0])
+        dr1 = models.DagRun.find(dag_id=self.dag2.dag_id,
+                                 execution_date=self.execution_dates[0])
         dr1 = dr1[0]
-        self.assertEqual(dr1.get_state(), State.FAILED)
-        dr3 = models.DagRun.find(dag_id=self.dag2.dag_id, 
execution_date=self.execution_dates[2])
+        self._verify_dag_run_state(self.dag2, self.execution_dates[0], 
State.FAILED)
+        dr3 = models.DagRun.find(dag_id=self.dag2.dag_id,
+                                 execution_date=self.execution_dates[2])
         dr3 = dr3[0]
-        self.assertEqual(dr3.get_state(), State.RUNNING)
+        self._verify_dag_run_state(self.dag2, self.execution_dates[2], 
State.RUNNING)
 
     def test_set_dag_run_state_edge_cases(self):
         # Dag does not exist
-        altered = set_dag_run_state(None, self.execution_dates[0])
+        altered = set_dag_run_state_to_success(None, self.execution_dates[0])
+        self.assertEqual(len(altered), 0)
+        altered = set_dag_run_state_to_failed(None, self.execution_dates[0])
+        self.assertEqual(len(altered), 0)
+        altered = set_dag_run_state_to_running(None, self.execution_dates[0])
         self.assertEqual(len(altered), 0)
 
         # Invalid execution date
-        altered = set_dag_run_state(self.dag1, None)
+        altered = set_dag_run_state_to_success(self.dag1, None)
+        self.assertEqual(len(altered), 0)
+        altered = set_dag_run_state_to_failed(self.dag1, None)
+        self.assertEqual(len(altered), 0)
+        altered = set_dag_run_state_to_running(self.dag1, None)
         self.assertEqual(len(altered), 0)
-        self.assertRaises(AssertionError, set_dag_run_state, self.dag1, 
timedelta(microseconds=-1))
 
+        # This will throw AssertionError since dag.latest_execution_date
+        # need to be 0 does not exist.
+        self.assertRaises(AssertionError, set_dag_run_state_to_success, 
self.dag1,
+                          timezone.make_naive(self.execution_dates[0]))
+
+        # altered = set_dag_run_state_to_success(self.dag1, 
self.execution_dates[0])
         # DagRun does not exist
         # This will throw AssertionError since dag.latest_execution_date does 
not exist
-        self.assertRaises(AssertionError, set_dag_run_state, self.dag1, 
self.execution_dates[0])
+        self.assertRaises(AssertionError, set_dag_run_state_to_success,
+                          self.dag1, self.execution_dates[0])
 
     def tearDown(self):
         self.dag1.clear()
@@ -397,5 +483,6 @@ class TestMarkDAGRun(unittest.TestCase):
         self.session.query(models.DagStat).delete()
         self.session.commit()
 
+
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/tests/dags/test_example_bash_operator.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_example_bash_operator.py 
b/tests/dags/test_example_bash_operator.py
index 94b89ff..f9bd6c7 100644
--- a/tests/dags/test_example_bash_operator.py
+++ b/tests/dags/test_example_bash_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,7 @@ from datetime import timedelta
 
 args = {
     'owner': 'airflow',
+    'retries': 3,
     'start_date': airflow.utils.dates.days_ago(2)
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/284dbdb6/tests/www_rbac/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index 71dcab0..0c25972 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -380,8 +380,14 @@ class TestAirflowBaseViews(TestBase):
         resp = self.client.post(url, follow_redirects=True)
         self.check_content_in_response('OK', resp)
 
-    def test_success(self):
+    def test_failed(self):
+        url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&'
+               
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url)
+        self.check_content_in_response('Wait a minute', resp)
 
+    def test_success(self):
         url = ('success?task_id=run_this_last&dag_id=example_bash_operator&'
                
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
                .format(self.percent_encode(self.default_date)))

Reply via email to