[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

DAGs that did not have a schedule (None or @once)
make the dependency
checker raise an exception as the previous
schedule will not exist.

Also activates all ti_deps tests.

Closes #2220 from bolkedebruin/AIRFLOW-1033

(cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ebfc3ea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ebfc3ea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ebfc3ea7

Branch: refs/heads/v1-8-stable
Commit: ebfc3ea73ae1ffe273e4ff532f1ad47441bef518
Parents: 9167411
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu Apr 6 14:03:11 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Apr 6 14:03:24 2017 +0200

----------------------------------------------------------------------
 airflow/ti_deps/deps/base_ti_dep.py             |  14 +-
 airflow/ti_deps/deps/prev_dagrun_dep.py         |   5 +
 .../ti_deps/deps/dag_ti_slots_available_dep.py  |  41 ---
 tests/ti_deps/deps/dag_unpaused_dep.py          |  41 ---
 tests/ti_deps/deps/dagrun_exists_dep.py         |  41 ---
 tests/ti_deps/deps/not_in_retry_period_dep.py   |  61 ----
 tests/ti_deps/deps/not_running_dep.py           |  39 ---
 tests/ti_deps/deps/not_skipped_dep.py           |  38 ---
 tests/ti_deps/deps/pool_has_space_dep.py        |  37 ---
 tests/ti_deps/deps/prev_dagrun_dep.py           | 143 ---------
 tests/ti_deps/deps/runnable_exec_date_dep.py    |  92 ------
 .../deps/test_dag_ti_slots_available_dep.py     |  42 +++
 tests/ti_deps/deps/test_dag_unpaused_dep.py     |  42 +++
 tests/ti_deps/deps/test_dagrun_exists_dep.py    |  40 +++
 .../deps/test_not_in_retry_period_dep.py        |  59 ++++
 tests/ti_deps/deps/test_not_running_dep.py      |  37 +++
 tests/ti_deps/deps/test_not_skipped_dep.py      |  36 +++
 tests/ti_deps/deps/test_prev_dagrun_dep.py      | 123 ++++++++
 .../ti_deps/deps/test_runnable_exec_date_dep.py |  76 +++++
 tests/ti_deps/deps/test_trigger_rule_dep.py     | 252 ++++++++++++++++
 tests/ti_deps/deps/test_valid_state_dep.py      |  46 +++
 tests/ti_deps/deps/trigger_rule_dep.py          | 295 -------------------
 tests/ti_deps/deps/valid_state_dep.py           |  49 ---
 23 files changed, 768 insertions(+), 881 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/airflow/ti_deps/deps/base_ti_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/base_ti_dep.py 
b/airflow/ti_deps/deps/base_ti_dep.py
index 0188043..bad1fa0 100644
--- a/airflow/ti_deps/deps/base_ti_dep.py
+++ b/airflow/ti_deps/deps/base_ti_dep.py
@@ -51,7 +51,7 @@ class BaseTIDep(object):
         """
         return getattr(self, 'NAME', self.__class__.__name__)
 
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, session, dep_context=None):
         """
         Abstract method that returns an iterable of TIDepStatus objects that 
describe
         whether the given task instance has this dependency met.
@@ -69,7 +69,7 @@ class BaseTIDep(object):
         raise NotImplementedError
 
     @provide_session
-    def get_dep_statuses(self, ti, session, dep_context):
+    def get_dep_statuses(self, ti, session, dep_context=None):
         """
         Wrapper around the private _get_dep_statuses method that contains some 
global
         checks for all dependencies.
@@ -81,6 +81,12 @@ class BaseTIDep(object):
         :param dep_context: the context for which this dependency should be 
evaluated for
         :type dep_context: DepContext
         """
+        # this avoids a circular dependency
+        from airflow.ti_deps.dep_context import DepContext
+
+        if dep_context is None:
+            dep_context = DepContext()
+
         if self.IGNOREABLE and dep_context.ignore_all_deps:
             yield self._passing_status(
                 reason="Context specified all dependencies should be ignored.")
@@ -95,7 +101,7 @@ class BaseTIDep(object):
             yield dep_status
 
     @provide_session
-    def is_met(self, ti, session, dep_context):
+    def is_met(self, ti, session, dep_context=None):
         """
         Returns whether or not this dependency is met for a given task 
instance. A
         dependency is considered met if all of the dependency statuses it 
reports are
@@ -113,7 +119,7 @@ class BaseTIDep(object):
                    self.get_dep_statuses(ti, session, dep_context))
 
     @provide_session
-    def get_failure_reasons(self, ti, session, dep_context):
+    def get_failure_reasons(self, ti, session, dep_context=None):
         """
         Returns an iterable of strings that explain why this dependency wasn't 
met.
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/airflow/ti_deps/deps/prev_dagrun_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py 
b/airflow/ti_deps/deps/prev_dagrun_dep.py
index 7d4baa8..5455fb5 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -41,6 +41,11 @@ class PrevDagrunDep(BaseTIDep):
         # Don't depend on the previous task instance if we are the first task
         dag = ti.task.dag
         if dag.catchup:
+            if dag.previous_schedule(ti.execution_date) is None:
+                yield self._passing_status(
+                    reason="This task does not have a schedule or is @once"
+                )
+                return
             if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
                 yield self._passing_status(
                     reason="This task instance was the first task instance for 
its task.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/dag_ti_slots_available_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/dag_ti_slots_available_dep.py 
b/tests/ti_deps/deps/dag_ti_slots_available_dep.py
deleted file mode 100644
index 6077d96..0000000
--- a/tests/ti_deps/deps/dag_ti_slots_available_dep.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-from airflow.ti_deps.deps.dag_ti_slots_available_dep import 
DagTISlotsAvailableDep
-from fake_models import FakeDag, FakeTask, FakeTI
-
-
-class DagTISlotsAvailableDepTest(unittest.TestCase):
-
-    def test_concurrency_reached(self):
-        """
-        Test concurrency reached should fail dep
-        """
-        dag = FakeDag(concurrency=1, concurrency_reached=True)
-        task = FakeTask(dag=dag)
-        ti = FakeTI(task=task, dag_id="fake_dag")
-
-        self.assertFalse(DagTISlotsAvailableDep().is_met(ti=ti, 
dep_context=None))
-
-    def test_all_conditions_met(self):
-        """
-        Test all conditions met should pass dep
-        """
-        dag = FakeDag(concurrency=1, concurrency_reached=False)
-        task = FakeTask(dag=dag)
-        ti = FakeTI(task=task, dag_id="fake_dag")
-
-        self.assertTrue(DagTISlotsAvailableDep().is_met(ti=ti, 
dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/dag_unpaused_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/dag_unpaused_dep.py 
b/tests/ti_deps/deps/dag_unpaused_dep.py
deleted file mode 100644
index 8721a51..0000000
--- a/tests/ti_deps/deps/dag_unpaused_dep.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep
-from fake_models import FakeDag, FakeTask, FakeTI
-
-
-class DagUnpausedDepTest(unittest.TestCase):
-
-    def test_concurrency_reached(self):
-        """
-        Test paused DAG should fail dependency
-        """
-        dag = FakeDag(is_paused=True)
-        task = FakeTask(dag=dag)
-        ti = FakeTI(task=task, dag_id="fake_dag")
-
-        self.assertFalse(DagUnpausedDep().is_met(ti=ti, dep_context=None))
-
-    def test_all_conditions_met(self):
-        """
-        Test all conditions met should pass dep
-        """
-        dag = FakeDag(is_paused=False)
-        task = FakeTask(dag=dag)
-        ti = FakeTI(task=task, dag_id="fake_dag")
-
-        self.assertTrue(DagUnpausedDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/dagrun_exists_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/dagrun_exists_dep.py 
b/tests/ti_deps/deps/dagrun_exists_dep.py
deleted file mode 100644
index 1141647..0000000
--- a/tests/ti_deps/deps/dagrun_exists_dep.py
+++ /dev/null
@@ -1,41 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep
-from fake_models import FakeDag, FakeTask, FakeTI
-
-
-class DagrunRunningDepTest(unittest.TestCase):
-
-    def test_dagrun_doesnt_exist(self):
-        """
-        Task instances without dagruns should fail this dep
-        """
-        dag = FakeDag(running_dagruns=[], max_active_runs=1)
-        task = FakeTask(dag=dag)
-        ti = FakeTI(dagrun=None, task=task, dag_id="fake_dag")
-
-        self.assertFalse(DagrunRunningDep().is_met(ti=ti, dep_context=None))
-
-    def test_dagrun_exists(self):
-        """
-        Task instances with a dagrun should pass this dep
-        """
-        dag = FakeDag(running_dagruns=[], max_active_runs=1)
-        task = FakeTask(dag=dag)
-        ti = FakeTI(dagrun="Fake Dagrun", task=task, dag_id="fake_dag")
-
-        self.assertTrue(DagrunRunningDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/not_in_retry_period_dep.py 
b/tests/ti_deps/deps/not_in_retry_period_dep.py
deleted file mode 100644
index a6657ba..0000000
--- a/tests/ti_deps/deps/not_in_retry_period_dep.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from datetime import datetime, timedelta
-
-from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
-from airflow.utils.state import State
-from fake_models import FakeDag, FakeTask, FakeTI
-
-
-class NotInRetryPeriodDepTest(unittest.TestCase):
-
-    def test_still_in_retry_period(self):
-        """
-        Task instances that are in their retry period should fail this dep
-        """
-        dag = FakeDag()
-        task = FakeTask(dag=dag, retry_delay=timedelta(minutes=1))
-        ti = FakeTI(
-            task=task,
-            state=State.UP_FOR_RETRY,
-            end_date=datetime(2016, 1, 1),
-            is_premature=True)
-
-        self.assertFalse(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None))
-
-    def test_retry_period_finished(self):
-        """
-        Task instance's that have had their retry period elapse should pass 
this dep
-        """
-        dag = FakeDag()
-        task = FakeTask(dag=dag, retry_delay=timedelta(minutes=1))
-        ti = FakeTI(
-            task=task,
-            state=State.UP_FOR_RETRY,
-            end_date=datetime(2016, 1, 1),
-            is_premature=False)
-
-        self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None))
-
-    def test_not_in_retry_period(self):
-        """
-        Task instance's that are not up for retry can not be in their retry 
period
-        """
-        dag = FakeDag()
-        task = FakeTask(dag=dag)
-        ti = FakeTI(task=task, state=State.SUCCESS)
-
-        self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/not_running_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/not_running_dep.py 
b/tests/ti_deps/deps/not_running_dep.py
deleted file mode 100644
index 159d923..0000000
--- a/tests/ti_deps/deps/not_running_dep.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from datetime import datetime
-
-from airflow.ti_deps.deps.not_running_dep import NotRunningDep
-from airflow.utils.state import State
-from fake_models import FakeTI
-
-
-class NotRunningDepTest(unittest.TestCase):
-
-    def test_ti_running(self):
-        """
-        Running task instances should fail this dep
-        """
-        ti = FakeTI(state=State.RUNNING, start_date=datetime(2016, 1, 1))
-
-        self.assertFalse(NotRunningDep().is_met(ti=ti, dep_context=None))
-
-    def test_ti_not_running(self):
-        """
-        Non-running task instances should pass this dep
-        """
-        ti = FakeTI(state=State.NONE, start_date=datetime(2016, 1, 1))
-
-        self.assertTrue(NotRunningDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/not_skipped_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/not_skipped_dep.py 
b/tests/ti_deps/deps/not_skipped_dep.py
deleted file mode 100644
index 6d7ef55..0000000
--- a/tests/ti_deps/deps/not_skipped_dep.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-from airflow.ti_deps.deps.not_skipped_dep import NotSkippedDep
-from airflow.utils.state import State
-from fake_models import FakeTI
-
-
-class NotSkippedDepTest(unittest.TestCase):
-
-    def test_skipped(self):
-        """
-        Skipped task instances should fail this dep
-        """
-        ti = FakeTI(state=State.SKIPPED)
-
-        self.assertFalse(NotSkippedDep().is_met(ti=ti, dep_context=None))
-
-    def test_not_skipped(self):
-        """
-        Non-skipped task instances should pass this dep
-        """
-        ti = FakeTI(state=State.RUNNING)
-
-        self.assertTrue(NotSkippedDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/pool_has_space_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/pool_has_space_dep.py 
b/tests/ti_deps/deps/pool_has_space_dep.py
deleted file mode 100644
index 411547a..0000000
--- a/tests/ti_deps/deps/pool_has_space_dep.py
+++ /dev/null
@@ -1,37 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-from airflow.ti_deps.deps.pool_has_space_dep import PoolHasSpaceDep
-from fake_models import FakeTI
-
-
-class PoolHasSpaceDepTest(unittest.TestCase):
-
-    def test_pool_full(self):
-        """
-        Full pools should fail this dep
-        """
-        ti = FakeTI(pool="fake_pool", pool_filled=True)
-
-        self.assertFalse(PoolHasSpaceDep().is_met(ti=ti, dep_context=None))
-
-    def test_not_skipped(self):
-        """
-        Pools with room should pass this dep
-        """
-        ti = FakeTI(pool="fake_pool", pool_filled=False)
-
-        self.assertTrue(PoolHasSpaceDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/prev_dagrun_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/prev_dagrun_dep.py 
b/tests/ti_deps/deps/prev_dagrun_dep.py
deleted file mode 100644
index 4873467..0000000
--- a/tests/ti_deps/deps/prev_dagrun_dep.py
+++ /dev/null
@@ -1,143 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from datetime import datetime
-
-from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
-from airflow.utils.state import State
-from fake_models import FakeContext, FakeTask, FakeTI
-
-
-class PrevDagrunDepTest(unittest.TestCase):
-
-    def test_not_depends_on_past(self):
-        """
-        If depends on past isn't set in the task then the previous dagrun 
should be
-        ignored, even though there is no previous_ti which would normally fail 
the dep
-        """
-        task = FakeTask(
-            depends_on_past=False,
-            start_date=datetime(2016, 1, 1),
-            wait_for_downstream=False)
-        prev_ti = FakeTI(
-            task=task,
-            execution_date=datetime(2016, 1, 2),
-            state=State.SUCCESS,
-            dependents_done=True)
-        ti = FakeTI(
-            task=task,
-            previous_ti=prev_ti,
-            execution_date=datetime(2016, 1, 3))
-        dep_context = FakeContext(ignore_depends_on_past=False)
-
-        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
-
-    def test_context_ignore_depends_on_past(self):
-        """
-        If the context overrides depends_on_past then the dep should be met, 
even though
-        there is no previous_ti which would normally fail the dep
-        """
-        task = FakeTask(
-            depends_on_past=True,
-            start_date=datetime(2016, 1, 1),
-            wait_for_downstream=False)
-        prev_ti = FakeTI(
-            task=task,
-            execution_date=datetime(2016, 1, 2),
-            state=State.SUCCESS,
-            dependents_done=True)
-        ti = FakeTI(
-            task=task,
-            previous_ti=prev_ti,
-            execution_date=datetime(2016, 1, 3))
-        dep_context = FakeContext(ignore_depends_on_past=True)
-
-        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
-
-    def test_first_task_run(self):
-        """
-        The first task run for a TI should pass since it has no previous 
dagrun.
-        """
-        task = FakeTask(
-            depends_on_past=True,
-            start_date=datetime(2016, 1, 1),
-            wait_for_downstream=False)
-        prev_ti = None
-        ti = FakeTI(
-            task=task,
-            previous_ti=prev_ti,
-            execution_date=datetime(2016, 1, 1))
-        dep_context = FakeContext(ignore_depends_on_past=False)
-
-        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
-
-    def test_prev_ti_bad_state(self):
-        """
-        If the previous TI did not complete execution this dep should fail.
-        """
-        task = FakeTask(
-            depends_on_past=True,
-            start_date=datetime(2016, 1, 1),
-            wait_for_downstream=False)
-        prev_ti = FakeTI(
-            state=State.NONE,
-            dependents_done=True)
-        ti = FakeTI(
-            task=task,
-            previous_ti=prev_ti,
-            execution_date=datetime(2016, 1, 2))
-        dep_context = FakeContext(ignore_depends_on_past=False)
-
-        self.assertFalse(PrevDagrunDep().is_met(ti=ti, 
dep_context=dep_context))
-
-    def test_failed_wait_for_downstream(self):
-        """
-        If the previous TI specified to wait for the downstream tasks of the 
previous
-        dagrun then it should fail this dep if the downstream TIs of the 
previous TI are
-        not done.
-        """
-        task = FakeTask(
-            depends_on_past=True,
-            start_date=datetime(2016, 1, 1),
-            wait_for_downstream=True)
-        prev_ti = FakeTI(
-            state=State.SUCCESS,
-            dependents_done=False)
-        ti = FakeTI(
-            task=task,
-            previous_ti=prev_ti,
-            execution_date=datetime(2016, 1, 2))
-        dep_context = FakeContext(ignore_depends_on_past=False)
-
-        self.assertFalse(PrevDagrunDep().is_met(ti=ti, 
dep_context=dep_context))
-
-    def test_all_met(self):
-        """
-        Test to make sure all of the conditions for the dep are met
-        """
-        task = FakeTask(
-            depends_on_past=True,
-            start_date=datetime(2016, 1, 1),
-            wait_for_downstream=True)
-        prev_ti = FakeTI(
-            state=State.SUCCESS,
-            dependents_done=True)
-        ti = FakeTI(
-            task=task,
-            previous_ti=prev_ti,
-            execution_date=datetime(2016, 1, 2))
-        dep_context = FakeContext(ignore_depends_on_past=False)
-
-        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/runnable_exec_date_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/runnable_exec_date_dep.py 
b/tests/ti_deps/deps/runnable_exec_date_dep.py
deleted file mode 100644
index ae09ddb..0000000
--- a/tests/ti_deps/deps/runnable_exec_date_dep.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from datetime import datetime
-from mock import patch
-
-from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
-from fake_models import FakeDag, FakeTask, FakeTI
-from tests.test_utils.fake_datetime import FakeDatetime
-
-
-class RunnableExecDateDepTest(unittest.TestCase):
-
-    @patch('airflow.ti_deps.deps.runnable_exec_date_dep.datetime', 
FakeDatetime)
-    def test_exec_date_after_end_date(self):
-        """
-        If the dag's execution date is in the future this dep should fail
-        """
-        FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 1))
-        dag = FakeDag(end_date=datetime(2016, 1, 3))
-        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 3))
-        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2))
-
-        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
-
-    def test_exec_date_before_task_end_date(self):
-        """
-        If the task instance execution date is before the DAG's end date this 
dep should
-        fail
-        """
-        FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 3))
-        dag = FakeDag(end_date=datetime(2016, 1, 1))
-        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 2))
-        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1))
-
-        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
-
-    def test_exec_date_after_task_end_date(self):
-        """
-        If the task instance execution date is after the DAG's end date this 
dep should
-        fail
-        """
-        FakeDatetime.now = classmethod(lambda cls: datetime(2016, 1, 3))
-        dag = FakeDag(end_date=datetime(2016, 1, 3))
-        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 1))
-        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2))
-
-        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
-
-    def test_exec_date_before_dag_end_date(self):
-        """
-        If the task instance execution date is before the dag's end date this 
dep should
-        fail
-        """
-        dag = FakeDag(start_date=datetime(2016, 1, 2))
-        task = FakeTask(dag=dag, start_date=datetime(2016, 1, 1))
-        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1))
-
-        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
-
-    def test_exec_date_after_dag_end_date(self):
-        """
-        If the task instance execution date is after the dag's end date this 
dep should
-        fail
-        """
-        dag = FakeDag(end_date=datetime(2016, 1, 1))
-        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 3))
-        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 2))
-
-        self.assertFalse(RunnableExecDateDep().is_met(ti=ti, dep_context=None))
-
-    def test_all_deps_met(self):
-        """
-        Test to make sure all of the conditions for the dep are met
-        """
-        dag = FakeDag(end_date=datetime(2016, 1, 2))
-        task = FakeTask(dag=dag, end_date=datetime(2016, 1, 2))
-        ti = FakeTI(task=task, execution_date=datetime(2016, 1, 1))
-
-        self.assertTrue(RunnableExecDateDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py 
b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
new file mode 100644
index 0000000..6910b66
--- /dev/null
+++ b/tests/ti_deps/deps/test_dag_ti_slots_available_dep.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import Mock
+
+from airflow.models import TaskInstance
+from airflow.ti_deps.deps.dag_ti_slots_available_dep import 
DagTISlotsAvailableDep
+
+
+class DagTISlotsAvailableDepTest(unittest.TestCase):
+
+    def test_concurrency_reached(self):
+        """
+        Test concurrency reached should fail dep
+        """
+        dag = Mock(concurrency=1, concurrency_reached=True)
+        task = Mock(dag=dag)
+        ti = TaskInstance(task, execution_date=None)
+
+        self.assertFalse(DagTISlotsAvailableDep().is_met(ti=ti))
+
+    def test_all_conditions_met(self):
+        """
+        Test all conditions met should pass dep
+        """
+        dag = Mock(concurrency=1, concurrency_reached=False)
+        task = Mock(dag=dag)
+        ti = TaskInstance(task, execution_date=None)
+
+        self.assertTrue(DagTISlotsAvailableDep().is_met(ti=ti))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_dag_unpaused_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_dag_unpaused_dep.py 
b/tests/ti_deps/deps/test_dag_unpaused_dep.py
new file mode 100644
index 0000000..969889a
--- /dev/null
+++ b/tests/ti_deps/deps/test_dag_unpaused_dep.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import Mock
+
+from airflow.models import TaskInstance
+from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep
+
+
+class DagUnpausedDepTest(unittest.TestCase):
+
+    def test_concurrency_reached(self):
+        """
+        Test paused DAG should fail dependency
+        """
+        dag = Mock(is_paused=True)
+        task = Mock(dag=dag)
+        ti = TaskInstance(task=task, execution_date=None)
+
+        self.assertFalse(DagUnpausedDep().is_met(ti=ti))
+
+    def test_all_conditions_met(self):
+        """
+        Test all conditions met should pass dep
+        """
+        dag = Mock(is_paused=False)
+        task = Mock(dag=dag)
+        ti = TaskInstance(task=task, execution_date=None)
+
+        self.assertTrue(DagUnpausedDep().is_met(ti=ti))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_dagrun_exists_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_dagrun_exists_dep.py 
b/tests/ti_deps/deps/test_dagrun_exists_dep.py
new file mode 100644
index 0000000..daad269
--- /dev/null
+++ b/tests/ti_deps/deps/test_dagrun_exists_dep.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow.utils.state import State
+from mock import Mock, patch
+
+from airflow.models import DAG, DagRun
+from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep
+
+
+class DagrunRunningDepTest(unittest.TestCase):
+
+    @patch('airflow.models.DagRun.find', return_value=())
+    def test_dagrun_doesnt_exist(self, dagrun_find):
+        """
+        Task instances without dagruns should fail this dep
+        """
+        dag = DAG('test_dag', max_active_runs=2)
+        ti = Mock(task=Mock(dag=dag), get_dagrun=Mock(return_value=None))
+        self.assertFalse(DagrunRunningDep().is_met(ti=ti))
+
+    def test_dagrun_exists(self):
+        """
+        Task instances with a dagrun should pass this dep
+        """
+        dagrun = DagRun(state=State.RUNNING)
+        ti = Mock(get_dagrun=Mock(return_value=dagrun))
+        self.assertTrue(DagrunRunningDep().is_met(ti=ti))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py 
b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
new file mode 100644
index 0000000..0f23aab
--- /dev/null
+++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime, timedelta
+from freezegun import freeze_time
+from mock import Mock
+
+from airflow.models import TaskInstance
+from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
+from airflow.utils.state import State
+
+
+class NotInRetryPeriodDepTest(unittest.TestCase):
+
+    def _get_task_instance(self, state, end_date=None,
+                           retry_delay=timedelta(minutes=15)):
+        task = Mock(retry_delay=retry_delay, retry_exponential_backoff=False)
+        ti = TaskInstance(task=task, state=state, execution_date=None)
+        ti.end_date = end_date
+        return ti
+
+    @freeze_time('2016-01-01 15:44')
+    def test_still_in_retry_period(self):
+        """
+        Task instances that are in their retry period should fail this dep
+        """
+        ti = self._get_task_instance(State.UP_FOR_RETRY,
+                                     end_date=datetime(2016, 1, 1, 15, 30))
+        self.assertTrue(ti.is_premature)
+        self.assertFalse(NotInRetryPeriodDep().is_met(ti=ti))
+
+    @freeze_time('2016-01-01 15:46')
+    def test_retry_period_finished(self):
+        """
+        Task instance's that have had their retry period elapse should pass 
this dep
+        """
+        ti = self._get_task_instance(State.UP_FOR_RETRY,
+                                     end_date=datetime(2016, 1, 1))
+        self.assertFalse(ti.is_premature)
+        self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti))
+
+    def test_not_in_retry_period(self):
+        """
+        Task instance's that are not up for retry can not be in their retry 
period
+        """
+        ti = self._get_task_instance(State.SUCCESS)
+        self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_not_running_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_not_running_dep.py 
b/tests/ti_deps/deps/test_not_running_dep.py
new file mode 100644
index 0000000..7f8f0cd
--- /dev/null
+++ b/tests/ti_deps/deps/test_not_running_dep.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+from mock import Mock
+
+from airflow.ti_deps.deps.not_running_dep import NotRunningDep
+from airflow.utils.state import State
+
+
+class NotRunningDepTest(unittest.TestCase):
+
+    def test_ti_running(self):
+        """
+        Running task instances should fail this dep
+        """
+        ti = Mock(state=State.RUNNING, start_date=datetime(2016, 1, 1))
+        self.assertFalse(NotRunningDep().is_met(ti=ti))
+
+    def test_ti_not_running(self):
+        """
+        Non-running task instances should pass this dep
+        """
+        ti = Mock(state=State.NONE, start_date=datetime(2016, 1, 1))
+        self.assertTrue(NotRunningDep().is_met(ti=ti))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_not_skipped_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_not_skipped_dep.py 
b/tests/ti_deps/deps/test_not_skipped_dep.py
new file mode 100644
index 0000000..8a31bf9
--- /dev/null
+++ b/tests/ti_deps/deps/test_not_skipped_dep.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import Mock
+
+from airflow.ti_deps.deps.not_skipped_dep import NotSkippedDep
+from airflow.utils.state import State
+
+
+class NotSkippedDepTest(unittest.TestCase):
+
+    def test_skipped(self):
+        """
+        Skipped task instances should fail this dep
+        """
+        ti = Mock(state=State.SKIPPED)
+        self.assertFalse(NotSkippedDep().is_met(ti=ti))
+
+    def test_not_skipped(self):
+        """
+        Non-skipped task instances should pass this dep
+        """
+        ti = Mock(state=State.RUNNING)
+        self.assertTrue(NotSkippedDep().is_met(ti=ti))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_prev_dagrun_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_prev_dagrun_dep.py 
b/tests/ti_deps/deps/test_prev_dagrun_dep.py
new file mode 100644
index 0000000..0f6f5da
--- /dev/null
+++ b/tests/ti_deps/deps/test_prev_dagrun_dep.py
@@ -0,0 +1,123 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+from mock import Mock
+
+from airflow.models import DAG, BaseOperator
+from airflow.ti_deps.dep_context import DepContext
+from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
+from airflow.utils.state import State
+
+
+class PrevDagrunDepTest(unittest.TestCase):
+
+    def _get_task(self, **kwargs):
+        return BaseOperator(task_id='test_task', dag=DAG('test_dag'), **kwargs)
+
+    def test_not_depends_on_past(self):
+        """
+        If depends on past isn't set in the task then the previous dagrun 
should be
+        ignored, even though there is no previous_ti which would normally fail 
the dep
+        """
+        task = self._get_task(depends_on_past=False,
+                              start_date=datetime(2016, 1, 1),
+                              wait_for_downstream=False)
+        prev_ti = Mock(task=task, state=State.SUCCESS,
+                       are_dependents_done=Mock(return_value=True),
+                       execution_date=datetime(2016, 1, 2))
+        ti = Mock(task=task, previous_ti=prev_ti,
+                  execution_date=datetime(2016, 1, 3))
+        dep_context = DepContext(ignore_depends_on_past=False)
+
+        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
+
+    def test_context_ignore_depends_on_past(self):
+        """
+        If the context overrides depends_on_past then the dep should be met,
+        even though there is no previous_ti which would normally fail the dep
+        """
+        task = self._get_task(depends_on_past=True,
+                              start_date=datetime(2016, 1, 1),
+                              wait_for_downstream=False)
+        prev_ti = Mock(task=task, state=State.SUCCESS,
+                       are_dependents_done=Mock(return_value=True),
+                       execution_date=datetime(2016, 1, 2))
+        ti = Mock(task=task, previous_ti=prev_ti,
+                  execution_date=datetime(2016, 1, 3))
+        dep_context = DepContext(ignore_depends_on_past=True)
+
+        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
+
+    def test_first_task_run(self):
+        """
+        The first task run for a TI should pass since it has no previous 
dagrun.
+        """
+        task = self._get_task(depends_on_past=True,
+                              start_date=datetime(2016, 1, 1),
+                              wait_for_downstream=False)
+        prev_ti = None
+        ti = Mock(task=task, previous_ti=prev_ti,
+                  execution_date=datetime(2016, 1, 1))
+        dep_context = DepContext(ignore_depends_on_past=False)
+
+        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
+
+    def test_prev_ti_bad_state(self):
+        """
+        If the previous TI did not complete execution this dep should fail.
+        """
+        task = self._get_task(depends_on_past=True,
+                              start_date=datetime(2016, 1, 1),
+                              wait_for_downstream=False)
+        prev_ti = Mock(state=State.NONE,
+                       are_dependents_done=Mock(return_value=True))
+        ti = Mock(task=task, previous_ti=prev_ti,
+                  execution_date=datetime(2016, 1, 2))
+        dep_context = DepContext(ignore_depends_on_past=False)
+
+        self.assertFalse(PrevDagrunDep().is_met(ti=ti, 
dep_context=dep_context))
+
+    def test_failed_wait_for_downstream(self):
+        """
+        If the previous TI specified to wait for the downstream tasks of the
+        previous dagrun then it should fail this dep if the downstream TIs of
+        the previous TI are not done.
+        """
+        task = self._get_task(depends_on_past=True,
+                              start_date=datetime(2016, 1, 1),
+                              wait_for_downstream=True)
+        prev_ti = Mock(state=State.SUCCESS,
+                       are_dependents_done=Mock(return_value=False))
+        ti = Mock(task=task, previous_ti=prev_ti,
+                  execution_date=datetime(2016, 1, 2))
+        dep_context = DepContext(ignore_depends_on_past=False)
+
+        self.assertFalse(PrevDagrunDep().is_met(ti=ti, 
dep_context=dep_context))
+
+    def test_all_met(self):
+        """
+        Test to make sure all of the conditions for the dep are met
+        """
+        task = self._get_task(depends_on_past=True,
+                              start_date=datetime(2016, 1, 1),
+                              wait_for_downstream=True)
+        prev_ti = Mock(state=State.SUCCESS,
+                       are_dependents_done=Mock(return_value=True))
+        ti = Mock(task=task, previous_ti=prev_ti,
+                  execution_date=datetime(2016, 1, 2))
+        dep_context = DepContext(ignore_depends_on_past=False)
+
+        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_runnable_exec_date_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py 
b/tests/ti_deps/deps/test_runnable_exec_date_dep.py
new file mode 100644
index 0000000..e1a396c
--- /dev/null
+++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+from freezegun import freeze_time
+from mock import Mock
+
+from airflow.models import TaskInstance
+from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
+
+
+class RunnableExecDateDepTest(unittest.TestCase):
+
+    def _get_task_instance(self, execution_date, dag_end_date=None, 
task_end_date=None):
+        dag = Mock(end_date=dag_end_date)
+        task = Mock(dag=dag, end_date=task_end_date)
+        return TaskInstance(task=task, execution_date=execution_date)
+
+    @freeze_time('2016-01-01')
+    def test_exec_date_after_end_date(self):
+        """
+        If the dag's execution date is in the future this dep should fail
+        """
+        ti = self._get_task_instance(
+            dag_end_date=datetime(2016, 1, 3),
+            task_end_date=datetime(2016, 1, 3),
+            execution_date=datetime(2016, 1, 2),
+        )
+        self.assertFalse(RunnableExecDateDep().is_met(ti=ti))
+
+    def test_exec_date_after_task_end_date(self):
+        """
+        If the task instance execution date is after the tasks's end date
+        this dep should fail
+        """
+        ti = self._get_task_instance(
+            dag_end_date=datetime(2016, 1, 3),
+            task_end_date=datetime(2016, 1, 1),
+            execution_date=datetime(2016, 1, 2),
+        )
+        self.assertFalse(RunnableExecDateDep().is_met(ti=ti))
+
+    def test_exec_date_after_dag_end_date(self):
+        """
+        If the task instance execution date is after the dag's end date
+        this dep should fail
+        """
+        ti = self._get_task_instance(
+            dag_end_date=datetime(2016, 1, 1),
+            task_end_date=datetime(2016, 1, 3),
+            execution_date=datetime(2016, 1, 2),
+        )
+        self.assertFalse(RunnableExecDateDep().is_met(ti=ti))
+
+    def test_all_deps_met(self):
+        """
+        Test to make sure all of the conditions for the dep are met
+        """
+        ti = self._get_task_instance(
+            dag_end_date=datetime(2016, 1, 2),
+            task_end_date=datetime(2016, 1, 2),
+            execution_date=datetime(2016, 1, 1),
+        )
+        self.assertTrue(RunnableExecDateDep().is_met(ti=ti))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py 
b/tests/ti_deps/deps/test_trigger_rule_dep.py
new file mode 100644
index 0000000..a61ff0d
--- /dev/null
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -0,0 +1,252 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+
+from airflow.models import BaseOperator, TaskInstance
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
+from airflow.utils.state import State
+
+
+class TriggerRuleDepTest(unittest.TestCase):
+
+    def _get_task_instance(self, trigger_rule=TriggerRule.ALL_SUCCESS,
+                           state=None, upstream_task_ids=None):
+        task = BaseOperator(task_id='test_task', trigger_rule=trigger_rule,
+                            start_date=datetime(2015, 1, 1))
+        if upstream_task_ids:
+            task._upstream_task_ids.extend(upstream_task_ids)
+        return TaskInstance(task=task, state=state, execution_date=None)
+
+    def test_no_upstream_tasks(self):
+        """
+        If the TI has no upstream TIs then there is nothing to check and the 
dep is passed
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_DONE, State.UP_FOR_RETRY)
+        self.assertTrue(TriggerRuleDep().is_met(ti=ti))
+
+    def test_dummy_tr(self):
+        """
+        The dummy trigger rule should always pass this dep
+        """
+        ti = self._get_task_instance(TriggerRule.DUMMY, State.UP_FOR_RETRY)
+        self.assertTrue(TriggerRuleDep().is_met(ti=ti))
+
+    def test_one_success_tr_success(self):
+        """
+        One-success trigger rule success
+        """
+        ti = self._get_task_instance(TriggerRule.ONE_SUCCESS, 
State.UP_FOR_RETRY)
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=2,
+            failed=2,
+            upstream_failed=2,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_one_success_tr_failure(self):
+        """
+        One-success trigger rule failure
+        """
+        ti = self._get_task_instance(TriggerRule.ONE_SUCCESS)
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=0,
+            skipped=2,
+            failed=2,
+            upstream_failed=2,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_one_failure_tr_failure(self):
+        """
+        One-failure trigger rule failure
+        """
+        ti = self._get_task_instance(TriggerRule.ONE_FAILED)
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=2,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_one_failure_tr_success(self):
+        """
+        One-failure trigger rule success
+        """
+        ti = self._get_task_instance(TriggerRule.ONE_FAILED)
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=0,
+            skipped=2,
+            failed=2,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 0)
+
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=0,
+            skipped=2,
+            failed=0,
+            upstream_failed=2,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_all_success_tr_success(self):
+        """
+        All-success trigger rule success
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_SUCCESS,
+                                     upstream_task_ids=["FakeTaskID"])
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=1,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_all_success_tr_failure(self):
+        """
+        All-success trigger rule failure
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_SUCCESS,
+                                     upstream_task_ids=["FakeTaskID",
+                                                        "OtherFakeTaskID"])
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=0,
+            failed=1,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_all_failed_tr_success(self):
+        """
+        All-failed trigger rule success
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_FAILED,
+                                     upstream_task_ids=["FakeTaskID",
+                                                        "OtherFakeTaskID"])
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=0,
+            skipped=0,
+            failed=2,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_all_failed_tr_failure(self):
+        """
+        All-failed trigger rule failure
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_FAILED,
+                                     upstream_task_ids=["FakeTaskID",
+                                                        "OtherFakeTaskID"])
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=2,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_all_done_tr_success(self):
+        """
+        All-done trigger rule success
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_DONE,
+                                     upstream_task_ids=["FakeTaskID",
+                                                        "OtherFakeTaskID"])
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=2,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=2,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 0)
+
+    def test_all_done_tr_failure(self):
+        """
+        All-done trigger rule failure
+        """
+        ti = self._get_task_instance(TriggerRule.ALL_DONE,
+                                     upstream_task_ids=["FakeTaskID",
+                                                        "OtherFakeTaskID"])
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=1,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)
+
+    def test_unknown_tr(self):
+        """
+        Unknown trigger rules should cause this dep to fail
+        """
+        ti = self._get_task_instance()
+        ti.task.trigger_rule = "Unknown Trigger Rule"
+        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=1,
+            skipped=0,
+            failed=0,
+            upstream_failed=0,
+            done=1,
+            flag_upstream_failed=False,
+            session="Fake Session"))
+
+        self.assertEqual(len(dep_statuses), 1)
+        self.assertFalse(dep_statuses[0].passed)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/test_valid_state_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_valid_state_dep.py 
b/tests/ti_deps/deps/test_valid_state_dep.py
new file mode 100644
index 0000000..2ece718
--- /dev/null
+++ b/tests/ti_deps/deps/test_valid_state_dep.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+from mock import Mock
+
+from airflow import AirflowException
+from airflow.ti_deps.deps.valid_state_dep import ValidStateDep
+from airflow.utils.state import State
+
+
+class ValidStateDepTest(unittest.TestCase):
+
+    def test_valid_state(self):
+        """
+        Valid state should pass this dep
+        """
+        ti = Mock(state=State.QUEUED, end_date=datetime(2016, 1, 1))
+        self.assertTrue(ValidStateDep({State.QUEUED}).is_met(ti=ti))
+
+    def test_invalid_state(self):
+        """
+        Invalid state should fail this dep
+        """
+        ti = Mock(state=State.SUCCESS, end_date=datetime(2016, 1, 1))
+        self.assertFalse(ValidStateDep({State.FAILED}).is_met(ti=ti))
+
+    def test_no_valid_states(self):
+        """
+        If there are no valid states the dependency should throw
+        """
+        ti = Mock(state=State.SUCCESS, end_date=datetime(2016, 1, 1))
+        with self.assertRaises(AirflowException):
+            ValidStateDep({}).is_met(ti=ti)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/trigger_rule_dep.py 
b/tests/ti_deps/deps/trigger_rule_dep.py
deleted file mode 100644
index 04a7737..0000000
--- a/tests/ti_deps/deps/trigger_rule_dep.py
+++ /dev/null
@@ -1,295 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-from airflow.utils.trigger_rule import TriggerRule
-from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
-from airflow.utils.state import State
-from fake_models import FakeTask, FakeTI
-
-
-class TriggerRuleDepTest(unittest.TestCase):
-
-    def test_no_upstream_tasks(self):
-        """
-        If the TI has no upstream TIs then there is nothing to check and the 
dep is passed
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ALL_DONE,
-            upstream_list=[])
-        ti = FakeTI(
-            task=task,
-            state=State.UP_FOR_RETRY)
-
-        self.assertTrue(TriggerRuleDep().is_met(ti=ti, dep_context=None))
-
-    def test_dummy_tr(self):
-        """
-        The dummy trigger rule should always pass this dep
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.DUMMY,
-            upstream_list=[])
-        ti = FakeTI(
-            task=task,
-            state=State.UP_FOR_RETRY)
-
-        self.assertTrue(TriggerRuleDep().is_met(ti=ti, dep_context=None))
-
-    def test_one_success_tr_success(self):
-        """
-        One-success trigger rule success
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ONE_SUCCESS,
-            upstream_task_ids=[])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=1,
-            skipped=2,
-            failed=2,
-            upstream_failed=2,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 0)
-
-    def test_one_success_tr_failure(self):
-        """
-        One-success trigger rule failure
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ONE_SUCCESS,
-            upstream_task_ids=[])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=0,
-            skipped=2,
-            failed=2,
-            upstream_failed=2,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 1)
-        self.assertFalse(dep_statuses[0].passed)
-
-    def test_one_failure_tr_failure(self):
-        """
-        One-failure trigger rule failure
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ONE_FAILED,
-            upstream_task_ids=[])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=2,
-            skipped=0,
-            failed=0,
-            upstream_failed=0,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-    def test_one_failure_tr_success(self):
-        """
-        One-failure trigger rule success
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ONE_FAILED,
-            upstream_task_ids=[])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=0,
-            skipped=2,
-            failed=2,
-            upstream_failed=0,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 0)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=0,
-            skipped=2,
-            failed=0,
-            upstream_failed=2,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 0)
-
-    def test_all_success_tr_success(self):
-        """
-        All-success trigger rule success
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ALL_SUCCESS,
-            upstream_task_ids=["FakeTaskID"])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=1,
-            skipped=0,
-            failed=0,
-            upstream_failed=0,
-            done=1,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 0)
-
-    def test_all_success_tr_failure(self):
-        """
-        All-success trigger rule failure
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ALL_SUCCESS,
-            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=1,
-            skipped=0,
-            failed=1,
-            upstream_failed=0,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 1)
-        self.assertFalse(dep_statuses[0].passed)
-
-    def test_all_failed_tr_success(self):
-        """
-        All-failed trigger rule success
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ALL_FAILED,
-            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=0,
-            skipped=0,
-            failed=2,
-            upstream_failed=0,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 0)
-
-    def test_all_failed_tr_failure(self):
-        """
-        All-failed trigger rule failure
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ALL_FAILED,
-            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=2,
-            skipped=0,
-            failed=0,
-            upstream_failed=0,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 1)
-        self.assertFalse(dep_statuses[0].passed)
-
-    def test_all_done_tr_success(self):
-        """
-        All-done trigger rule success
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ALL_DONE,
-            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=2,
-            skipped=0,
-            failed=0,
-            upstream_failed=0,
-            done=2,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 0)
-
-    def test_all_done_tr_failure(self):
-        """
-        All-done trigger rule failure
-        """
-        task = FakeTask(
-            trigger_rule=TriggerRule.ALL_DONE,
-            upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=1,
-            skipped=0,
-            failed=0,
-            upstream_failed=0,
-            done=1,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 1)
-        self.assertFalse(dep_statuses[0].passed)
-
-    def test_unknown_tr(self):
-        """
-        Unknown trigger rules should cause this dep to fail
-        """
-        task = FakeTask(
-            trigger_rule="Unknown Trigger Rule",
-            upstream_task_ids=[])
-        ti = FakeTI(task=task)
-
-        dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
-            ti=ti,
-            successes=1,
-            skipped=0,
-            failed=0,
-            upstream_failed=0,
-            done=1,
-            flag_upstream_failed=False,
-            session="Fake Session"))
-
-        self.assertEqual(len(dep_statuses), 1)
-        self.assertFalse(dep_statuses[0].passed)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/tests/ti_deps/deps/valid_state_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/valid_state_dep.py 
b/tests/ti_deps/deps/valid_state_dep.py
deleted file mode 100644
index 6bc0835..0000000
--- a/tests/ti_deps/deps/valid_state_dep.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from datetime import datetime
-
-from airflow import AirflowException
-from airflow.ti_deps.deps.valid_state_dep import ValidStateDep
-from airflow.utils.state import State
-from fake_models import FakeTI
-
-
-class ValidStateDepTest(unittest.TestCase):
-
-    def test_valid_state(self):
-        """
-        Valid state should pass this dep
-        """
-        ti = FakeTI(state=State.QUEUED, end_date=datetime(2016, 1, 1))
-
-        self.assertTrue(ValidStateDep({State.QUEUED}).is_met(ti=ti, 
dep_context=None))
-
-    def test_invalid_state(self):
-        """
-        Invalid state should fail this dep
-        """
-        ti = FakeTI(state=State.SUCCESS, end_date=datetime(2016, 1, 1))
-
-        self.assertFalse(ValidStateDep({State.FAILURE}).is_met(ti=ti, 
dep_context=None))
-
-    def test_no_valid_states(self):
-        """
-        If there are no valid states the dependency should throw
-        """
-        ti = FakeTI(state=State.SUCCESS, end_date=datetime(2016, 1, 1))
-
-        with self.assertRaises(AirflowException):
-            ValidStateDep({}).is_met(ti=ti, dep_context=None)

Reply via email to