Repository: incubator-airflow
Updated Branches:
  refs/heads/master 17ddbcff0 -> f3f2eb323


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3f2eb32/tests/www_rbac/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index 0c25972..b4a2f02 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -29,12 +29,11 @@ import unittest
 import urllib
 
 from flask._compat import PY2
-from flask_appbuilder.security.sqla.models import User as ab_user
 from urllib.parse import quote_plus
 from werkzeug.test import Client
 
 from airflow import configuration as conf
-from airflow import models
+from airflow import models, settings
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
@@ -48,17 +47,17 @@ from airflow.www_rbac import app as application
 class TestBase(unittest.TestCase):
     def setUp(self):
         conf.load_test_config()
-        self.app, self.appbuilder = application.create_app(testing=True)
+        self.app, self.appbuilder = application.create_app(session=Session, 
testing=True)
         self.app.config['WTF_CSRF_ENABLED'] = False
         self.client = self.app.test_client()
-        self.session = Session()
+        settings.configure_orm()
+        self.session = Session
         self.login()
 
     def login(self):
-        sm_session = self.appbuilder.sm.get_session()
-        self.user = sm_session.query(ab_user).first()
-        if not self.user:
-            role_admin = self.appbuilder.sm.find_role('Admin')
+        role_admin = self.appbuilder.sm.find_role('Admin')
+        tester = self.appbuilder.sm.find_user(username='test')
+        if not tester:
             self.appbuilder.sm.add_user(
                 username='test',
                 first_name='test',
@@ -88,6 +87,15 @@ class TestBase(unittest.TestCase):
         else:
             self.assertIn(text, resp_html)
 
+    def check_content_not_in_response(self, text, resp, resp_code=200):
+        resp_html = resp.data.decode('utf-8')
+        self.assertEqual(resp_code, resp.status_code)
+        if isinstance(text, list):
+            for kw in text:
+                self.assertNotIn(kw, resp_html)
+        else:
+            self.assertNotIn(text, resp_html)
+
     def percent_encode(self, obj):
         if PY2:
             return urllib.quote_plus(str(obj))
@@ -137,10 +145,6 @@ class TestVariableModelView(TestBase):
         resp = self.client.post('/variable/add',
                                 data=self.variable,
                                 follow_redirects=True)
-        self.assertEqual(resp.status_code, 200)
-        v = self.session.query(models.Variable).first()
-        self.assertEqual(v.key, 'test_key')
-        self.assertEqual(v.val, 'text_val')
 
         # update the variable with a wrong value, given that is encrypted
         Var = models.Variable
@@ -248,6 +252,8 @@ class TestAirflowBaseViews(TestBase):
 
     def setUp(self):
         super(TestAirflowBaseViews, self).setUp()
+        self.logout()
+        self.login()
         self.cleanup_dagruns()
         self.prepare_dagruns()
 
@@ -292,7 +298,7 @@ class TestAirflowBaseViews(TestBase):
         self.check_content_in_response('DAGs', resp)
 
     def test_health(self):
-        resp = self.client.get('health')
+        resp = self.client.get('health', follow_redirects=True)
         self.check_content_in_response('The server is healthy!', resp)
 
     def test_home(self):
@@ -361,7 +367,7 @@ class TestAirflowBaseViews(TestBase):
         self.check_content_in_response('example_bash_operator', resp)
 
     def test_landing_times(self):
-        url = 'landing_times?days=30&dag_id=test_example_bash_operator'
+        url = 'landing_times?days=30&dag_id=example_bash_operator'
         resp = self.client.get(url, follow_redirects=True)
         self.check_content_in_response('example_bash_operator', resp)
 
@@ -415,6 +421,8 @@ class TestAirflowBaseViews(TestBase):
 
 class TestConfigurationView(TestBase):
     def test_configuration(self):
+        self.logout()
+        self.login()
         resp = self.client.get('configuration', follow_redirects=True)
         self.check_content_in_response(
             ['Airflow Configuration', 'Running Configuration'], resp)
@@ -437,6 +445,7 @@ class TestLogView(TestBase):
         current_dir = os.path.dirname(os.path.abspath(__file__))
         logging_config['handlers']['task']['base_log_folder'] = 
os.path.normpath(
             os.path.join(current_dir, 'test_logs'))
+
         logging_config['handlers']['task']['filename_template'] = \
             '{{ ti.dag_id }}/{{ ti.task_id }}/' \
             '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
@@ -450,11 +459,12 @@ class TestLogView(TestBase):
         sys.path.append(self.settings_folder)
         conf.set('core', 'logging_config_class', 
'airflow_local_settings.LOGGING_CONFIG')
 
-        self.app, self.appbuilder = application.create_app(testing=True)
+        self.app, self.appbuilder = application.create_app(session=Session, 
testing=True)
         self.app.config['WTF_CSRF_ENABLED'] = False
         self.client = self.app.test_client()
+        settings.configure_orm()
+        self.session = Session
         self.login()
-        self.session = Session()
 
         from airflow.www_rbac.views import dagbag
         dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE)
@@ -477,9 +487,9 @@ class TestLogView(TestBase):
 
     def test_get_file_task_log(self):
         response = self.client.get(
-            TestLogView.ENDPOINT,
-            follow_redirects=True,
-        )
+            TestLogView.ENDPOINT, data=dict(
+                username='test',
+                password='test'), follow_redirects=True)
         self.assertEqual(response.status_code, 200)
         self.assertIn('Log by attempts',
                       response.data.decode('utf-8'))
@@ -493,7 +503,10 @@ class TestLogView(TestBase):
                                                 self.TASK_ID,
                                                 
quote_plus(self.DEFAULT_DATE.isoformat()),
                                                 1,
-                                                json.dumps({})), 
follow_redirects=True)
+                                                json.dumps({})), data=dict(
+                                                    username='test',
+                                                    password='test'),
+                            follow_redirects=True)
 
         self.assertIn('"message":', response.data.decode('utf-8'))
         self.assertIn('"metadata":', response.data.decode('utf-8'))
@@ -508,7 +521,10 @@ class TestLogView(TestBase):
             self.client.get(url_template.format(self.DAG_ID,
                                                 self.TASK_ID,
                                                 
quote_plus(self.DEFAULT_DATE.isoformat()),
-                                                1), follow_redirects=True)
+                                                1), data=dict(
+                                                    username='test',
+                                                    password='test'),
+                            follow_redirects=True)
 
         self.assertIn('"message":', response.data.decode('utf-8'))
         self.assertIn('"metadata":', response.data.decode('utf-8'))
@@ -518,7 +534,10 @@ class TestLogView(TestBase):
 
 class TestVersionView(TestBase):
     def test_version(self):
-        resp = self.client.get('version', follow_redirects=True)
+        resp = self.client.get('version', data=dict(
+            username='test',
+            password='test'
+        ), follow_redirects=True)
         self.check_content_in_response('Version Info', resp)
 
 
@@ -582,8 +601,9 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
         Should set base date to current date (not asserted)
         """
         response = self.test.client.get(
-            self.endpoint
-        )
+            self.endpoint, data=dict(
+                username='test',
+                password='test'), follow_redirects=True)
         self.test.assertEqual(response.status_code, 200)
         data = response.data.decode('utf-8')
         self.test.assertIn('Base date:', data)
@@ -603,7 +623,11 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
         """
         response = self.test.client.get(
             self.endpoint + '&execution_date={}'.format(
-                self.runs[1].execution_date.isoformat())
+                self.runs[1].execution_date.isoformat()),
+            data=dict(
+                username='test',
+                password='test'
+            ), follow_redirects=True
         )
         self.test.assertEqual(response.status_code, 200)
         data = response.data.decode('utf-8')
@@ -626,7 +650,11 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
         """
         response = self.test.client.get(
             self.endpoint + '&base_date={}&num_runs=2'.format(
-                self.runs[1].execution_date.isoformat())
+                self.runs[1].execution_date.isoformat()),
+            data=dict(
+                username='test',
+                password='test'
+            ), follow_redirects=True
         )
         self.test.assertEqual(response.status_code, 200)
         data = response.data.decode('utf-8')
@@ -648,7 +676,11 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
         response = self.test.client.get(
             self.endpoint + 
'&base_date={}&num_runs=42&execution_date={}'.format(
                 self.runs[1].execution_date.isoformat(),
-                self.runs[0].execution_date.isoformat())
+                self.runs[0].execution_date.isoformat()),
+            data=dict(
+                username='test',
+                password='test'
+            ), follow_redirects=True
         )
         self.test.assertEqual(response.status_code, 200)
         data = response.data.decode('utf-8')
@@ -670,7 +702,11 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
         response = self.test.client.get(
             self.endpoint + 
'&base_date={}&num_runs=5&execution_date={}'.format(
                 self.runs[2].execution_date.isoformat(),
-                self.runs[3].execution_date.isoformat())
+                self.runs[3].execution_date.isoformat()),
+            data=dict(
+                username='test',
+                password='test'
+            ), follow_redirects=True
         )
         self.test.assertEqual(response.status_code, 200)
         data = response.data.decode('utf-8')
@@ -759,5 +795,505 @@ class TestGanttView(TestBase):
         
self.tester.test_with_base_date_and_num_runs_and_execution_date_within()
 
 
+class TestDagACLView(TestBase):
+    """
+    Test Airflow DAG acl
+    """
+    default_date = timezone.datetime(2018, 6, 1)
+    run_id = "test_{}".format(models.DagRun.id_for_date(default_date))
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestDagACLView, cls).setUpClass()
+
+    def cleanup_dagruns(self):
+        DR = models.DagRun
+        dag_ids = ['example_bash_operator',
+                   'example_subdag_operator']
+        (self.session
+             .query(DR)
+             .filter(DR.dag_id.in_(dag_ids))
+             .filter(DR.run_id == self.run_id)
+             .delete(synchronize_session='fetch'))
+        self.session.commit()
+
+    def prepare_dagruns(self):
+        dagbag = models.DagBag(include_examples=True)
+        self.bash_dag = dagbag.dags['example_bash_operator']
+        self.sub_dag = dagbag.dags['example_subdag_operator']
+
+        self.bash_dagrun = self.bash_dag.create_dagrun(
+            run_id=self.run_id,
+            execution_date=self.default_date,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING)
+
+        self.sub_dagrun = self.sub_dag.create_dagrun(
+            run_id=self.run_id,
+            execution_date=self.default_date,
+            start_date=timezone.utcnow(),
+            state=State.RUNNING)
+
+    def setUp(self):
+        super(TestDagACLView, self).setUp()
+        self.cleanup_dagruns()
+        self.prepare_dagruns()
+        self.logout()
+        self.appbuilder.sm.sync_roles()
+        self.add_permission_for_role()
+
+    def login(self, username=None, password=None):
+        role_admin = self.appbuilder.sm.find_role('Admin')
+        tester = self.appbuilder.sm.find_user(username='test')
+        if not tester:
+            self.appbuilder.sm.add_user(
+                username='test',
+                first_name='test',
+                last_name='test',
+                email='t...@fab.org',
+                role=role_admin,
+                password='test')
+
+        dag_acl_role = self.appbuilder.sm.add_role('dag_acl_tester')
+        dag_tester = self.appbuilder.sm.find_user(username='dag_tester')
+        if not dag_tester:
+            self.appbuilder.sm.add_user(
+                username='dag_tester',
+                first_name='dag_test',
+                last_name='dag_test',
+                email='dag_t...@fab.org',
+                role=dag_acl_role,
+                password='dag_test')
+
+        # create an user without permission
+        dag_no_role = self.appbuilder.sm.add_role('dag_acl_faker')
+        dag_faker = self.appbuilder.sm.find_user(username='dag_faker')
+        if not dag_faker:
+            self.appbuilder.sm.add_user(
+                username='dag_faker',
+                first_name='dag_faker',
+                last_name='dag_faker',
+                email='dag_f...@fab.org',
+                role=dag_no_role,
+                password='dag_faker')
+
+        # create an user with only read permission
+        dag_read_only_role = self.appbuilder.sm.add_role('dag_acl_read_only')
+        dag_read_only = self.appbuilder.sm.find_user(username='dag_read_only')
+        if not dag_read_only:
+            self.appbuilder.sm.add_user(
+                username='dag_read_only',
+                first_name='dag_read_only',
+                last_name='dag_read_only',
+                email='dag_read_o...@fab.org',
+                role=dag_read_only_role,
+                password='dag_read_only')
+
+        # create an user that has all dag access
+        all_dag_role = self.appbuilder.sm.add_role('all_dag_role')
+        all_dag_tester = self.appbuilder.sm.find_user(username='all_dag_user')
+        if not all_dag_tester:
+            self.appbuilder.sm.add_user(
+                username='all_dag_user',
+                first_name='all_dag_user',
+                last_name='all_dag_user',
+                email='all_dag_u...@fab.org',
+                role=all_dag_role,
+                password='all_dag_user')
+
+        user = username if username else 'dag_tester'
+        passwd = password if password else 'dag_test'
+
+        return self.client.post('/login/', data=dict(
+            username=user,
+            password=passwd
+        ))
+
+    def logout(self):
+        return self.client.get('/logout/')
+
+    def add_permission_for_role(self):
+        self.logout()
+        self.login(username='test',
+                   password='test')
+        perm_on_dag = self.appbuilder.sm.\
+            find_permission_view_menu('can_dag_edit', 'example_bash_operator')
+        dag_tester_role = self.appbuilder.sm.find_role('dag_acl_tester')
+        self.appbuilder.sm.add_permission_role(dag_tester_role, perm_on_dag)
+
+        perm_on_all_dag = self.appbuilder.sm.\
+            find_permission_view_menu('can_dag_edit', 'all_dags')
+        all_dag_role = self.appbuilder.sm.find_role('all_dag_role')
+        self.appbuilder.sm.add_permission_role(all_dag_role, perm_on_all_dag)
+
+        read_only_perm_on_dag = self.appbuilder.sm.\
+            find_permission_view_menu('can_dag_read', 'example_bash_operator')
+        dag_read_only_role = self.appbuilder.sm.find_role('dag_acl_read_only')
+        self.appbuilder.sm.add_permission_role(dag_read_only_role, 
read_only_perm_on_dag)
+
+    def test_permission_exist(self):
+        self.logout()
+        self.login(username='test',
+                   password='test')
+        test_view_menu = 
self.appbuilder.sm.find_view_menu('example_bash_operator')
+        perms_views = 
self.appbuilder.sm.find_permissions_view_menu(test_view_menu)
+        self.assertEqual(len(perms_views), 2)
+        # each dag view will create one write, and one read permission
+        self.assertTrue(str(perms_views[0]).startswith('can dag'))
+        self.assertTrue(str(perms_views[1]).startswith('can dag'))
+
+    def test_role_permission_associate(self):
+        self.logout()
+        self.login(username='test',
+                   password='test')
+        test_role = self.appbuilder.sm.find_role('dag_acl_tester')
+        perms = set([str(perm) for perm in test_role.permissions])
+        self.assertIn('can dag edit on example_bash_operator', perms)
+        self.assertNotIn('can dag read on example_bash_operator', perms)
+
+    def test_index_success(self):
+        self.logout()
+        self.login()
+        resp = self.client.get('/', follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_index_failure(self):
+        self.logout()
+        self.login()
+        resp = self.client.get('/', follow_redirects=True)
+        # The user can only access/view example_bash_operator dag.
+        self.check_content_not_in_response('example_subdag_operator', resp)
+
+    def test_index_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        resp = self.client.get('/', follow_redirects=True)
+        # The all dag user can access/view all dags.
+        self.check_content_in_response('example_subdag_operator', resp)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_dag_stats_success(self):
+        self.logout()
+        self.login()
+        resp = self.client.get('dag_stats', follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_dag_stats_failure(self):
+        self.logout()
+        self.login()
+        resp = self.client.get('dag_stats', follow_redirects=True)
+        self.check_content_not_in_response('example_subdag_operator', resp)
+
+    def test_dag_stats_success_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        resp = self.client.get('dag_stats', follow_redirects=True)
+        self.check_content_in_response('example_subdag_operator', resp)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_task_stats_success(self):
+        self.logout()
+        self.login()
+        resp = self.client.get('task_stats', follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_task_stats_failure(self):
+        self.logout()
+        self.login()
+        resp = self.client.get('task_stats', follow_redirects=True)
+        self.check_content_not_in_response('example_subdag_operator', resp)
+
+    def test_task_stats_success_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        resp = self.client.get('task_stats', follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+        self.check_content_in_response('example_subdag_operator', resp)
+
+    def test_code_success(self):
+        self.logout()
+        self.login()
+        url = 'code?dag_id=example_bash_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_code_failure(self):
+        self.logout()
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        url = 'code?dag_id=example_bash_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('example_bash_operator', resp)
+
+    def test_code_success_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        url = 'code?dag_id=example_bash_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+        url = 'code?dag_id=example_subdag_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_subdag_operator', resp)
+
+    def test_dag_details_success(self):
+        self.logout()
+        self.login()
+        url = 'dag_details?dag_id=example_bash_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('DAG details', resp)
+
+    def test_dag_details_failure(self):
+        self.logout()
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        url = 'dag_details?dag_id=example_bash_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('DAG details', resp)
+
+    def test_dag_details_success_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        url = 'dag_details?dag_id=example_bash_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+        url = 'dag_details?dag_id=example_subdag_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_subdag_operator', resp)
+
+    def test_pickle_info_success(self):
+        self.logout()
+        self.login()
+        url = 'pickle_info?dag_id=example_bash_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.assertEqual(resp.status_code, 200)
+
+    def test_rendered_success(self):
+        self.logout()
+        self.login()
+        url = 
('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('Rendered Template', resp)
+
+    def test_rendered_failure(self):
+        self.logout()
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        url = 
('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('Rendered Template', resp)
+
+    def test_rendered_success_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        url = 
('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('Rendered Template', resp)
+
+    def test_task_success(self):
+        self.logout()
+        self.login()
+        url = 
('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('Task Instance Details', resp)
+
+    def test_task_failure(self):
+        self.logout()
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        url = 
('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('Task Instance Details', resp)
+
+    def test_task_success_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        url = 
('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('Task Instance Details', resp)
+
+    def test_xcom_success(self):
+        self.logout()
+        self.login()
+        url = 
('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('XCom', resp)
+
+    def test_xcom_failure(self):
+        self.logout()
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        url = 
('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('XCom', resp)
+
+    def test_xcom_success_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        url = 
('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('XCom', resp)
+
+    def test_run_success(self):
+        self.logout()
+        self.login()
+        url = 
('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&'
+               'ignore_ti_state=true&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url)
+        self.check_content_in_response('', resp, resp_code=302)
+
+    def test_run_success_for_all_dag_user(self):
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        url = 
('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&'
+               'ignore_ti_state=true&execution_date={}'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url)
+        self.check_content_in_response('', resp, resp_code=302)
+
+    def test_blocked_success(self):
+        url = 'blocked'
+        self.logout()
+        self.login()
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_blocked_success_for_all_dag_user(self):
+        url = 'blocked'
+        self.logout()
+        self.login(username='all_dag_user',
+                   password='all_dag_user')
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+        self.check_content_in_response('example_subdag_operator', resp)
+
+    def test_failed_success(self):
+        self.logout()
+        self.login()
+        url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&'
+               
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url)
+        self.check_content_in_response('Redirecting', resp, 302)
+
+    def test_duration_success(self):
+        url = 'duration?days=30&dag_id=example_bash_operator'
+        self.logout()
+        self.login()
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_duration_failure(self):
+        url = 'duration?days=30&dag_id=example_bash_operator'
+        self.logout()
+        # login as an user without permissions
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('example_bash_operator', resp)
+
+    def test_tries_success(self):
+        url = 'tries?days=30&dag_id=example_bash_operator'
+        self.logout()
+        self.login()
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_tries_failure(self):
+        url = 'tries?days=30&dag_id=example_bash_operator'
+        self.logout()
+        # login as an user without permissions
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('example_bash_operator', resp)
+
+    def test_landing_times_success(self):
+        url = 'landing_times?days=30&dag_id=example_bash_operator'
+        self.logout()
+        self.login()
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_landing_times_failure(self):
+        url = 'landing_times?days=30&dag_id=example_bash_operator'
+        self.logout()
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('example_bash_operator', resp)
+
+    def test_paused_success(self):
+        # post request failure won't test
+        url = 'paused?dag_id=example_bash_operator&is_paused=false'
+        self.logout()
+        self.login()
+        resp = self.client.post(url, follow_redirects=True)
+        self.check_content_in_response('OK', resp)
+
+    def test_refresh_success(self):
+        self.logout()
+        self.login()
+        resp = self.client.get('refresh?dag_id=example_bash_operator')
+        self.check_content_in_response('', resp, resp_code=302)
+
+    def test_gantt_success(self):
+        url = 'gantt?dag_id=example_bash_operator'
+        self.logout()
+        self.login()
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('example_bash_operator', resp)
+
+    def test_gantt_failure(self):
+        url = 'gantt?dag_id=example_bash_operator'
+        self.logout()
+        self.login(username='dag_faker',
+                   password='dag_faker')
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_not_in_response('example_bash_operator', resp)
+
+    def test_success_fail_for_read_only_role(self):
+        # succcess endpoint need can_dag_edit, which read only role can not 
access
+        self.logout()
+        self.login(username='dag_read_only',
+                   password='dag_read_only')
+
+        url = ('success?task_id=run_this_last&dag_id=example_bash_operator&'
+               
'execution_date={}&upstream=false&downstream=false&future=false&past=false'
+               .format(self.percent_encode(self.default_date)))
+        resp = self.client.get(url)
+        self.check_content_not_in_response('Wait a minute', resp, 
resp_code=302)
+
+    def test_tree_success_for_read_only_role(self):
+        # tree view only allows can_dag_read, which read only role could access
+        self.logout()
+        self.login(username='dag_read_only',
+                   password='dag_read_only')
+
+        url = 'tree?dag_id=example_bash_operator'
+        resp = self.client.get(url, follow_redirects=True)
+        self.check_content_in_response('runme_1', resp)
+
+
 if __name__ == '__main__':
     unittest.main()

Reply via email to