Repository: incubator-airflow Updated Branches: refs/heads/master 17ddbcff0 -> f3f2eb323
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3f2eb32/tests/www_rbac/test_views.py ---------------------------------------------------------------------- diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py index 0c25972..b4a2f02 100644 --- a/tests/www_rbac/test_views.py +++ b/tests/www_rbac/test_views.py @@ -29,12 +29,11 @@ import unittest import urllib from flask._compat import PY2 -from flask_appbuilder.security.sqla.models import User as ab_user from urllib.parse import quote_plus from werkzeug.test import Client from airflow import configuration as conf -from airflow import models +from airflow import models, settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, DagRun, TaskInstance from airflow.operators.dummy_operator import DummyOperator @@ -48,17 +47,17 @@ from airflow.www_rbac import app as application class TestBase(unittest.TestCase): def setUp(self): conf.load_test_config() - self.app, self.appbuilder = application.create_app(testing=True) + self.app, self.appbuilder = application.create_app(session=Session, testing=True) self.app.config['WTF_CSRF_ENABLED'] = False self.client = self.app.test_client() - self.session = Session() + settings.configure_orm() + self.session = Session self.login() def login(self): - sm_session = self.appbuilder.sm.get_session() - self.user = sm_session.query(ab_user).first() - if not self.user: - role_admin = self.appbuilder.sm.find_role('Admin') + role_admin = self.appbuilder.sm.find_role('Admin') + tester = self.appbuilder.sm.find_user(username='test') + if not tester: self.appbuilder.sm.add_user( username='test', first_name='test', @@ -88,6 +87,15 @@ class TestBase(unittest.TestCase): else: self.assertIn(text, resp_html) + def check_content_not_in_response(self, text, resp, resp_code=200): + resp_html = resp.data.decode('utf-8') + self.assertEqual(resp_code, resp.status_code) + if isinstance(text, list): + for kw in text: + self.assertNotIn(kw, resp_html) + else: + self.assertNotIn(text, resp_html) + def percent_encode(self, obj): if PY2: return urllib.quote_plus(str(obj)) @@ -137,10 +145,6 @@ class TestVariableModelView(TestBase): resp = self.client.post('/variable/add', data=self.variable, follow_redirects=True) - self.assertEqual(resp.status_code, 200) - v = self.session.query(models.Variable).first() - self.assertEqual(v.key, 'test_key') - self.assertEqual(v.val, 'text_val') # update the variable with a wrong value, given that is encrypted Var = models.Variable @@ -248,6 +252,8 @@ class TestAirflowBaseViews(TestBase): def setUp(self): super(TestAirflowBaseViews, self).setUp() + self.logout() + self.login() self.cleanup_dagruns() self.prepare_dagruns() @@ -292,7 +298,7 @@ class TestAirflowBaseViews(TestBase): self.check_content_in_response('DAGs', resp) def test_health(self): - resp = self.client.get('health') + resp = self.client.get('health', follow_redirects=True) self.check_content_in_response('The server is healthy!', resp) def test_home(self): @@ -361,7 +367,7 @@ class TestAirflowBaseViews(TestBase): self.check_content_in_response('example_bash_operator', resp) def test_landing_times(self): - url = 'landing_times?days=30&dag_id=test_example_bash_operator' + url = 'landing_times?days=30&dag_id=example_bash_operator' resp = self.client.get(url, follow_redirects=True) self.check_content_in_response('example_bash_operator', resp) @@ -415,6 +421,8 @@ class TestAirflowBaseViews(TestBase): class TestConfigurationView(TestBase): def test_configuration(self): + self.logout() + self.login() resp = self.client.get('configuration', follow_redirects=True) self.check_content_in_response( ['Airflow Configuration', 'Running Configuration'], resp) @@ -437,6 +445,7 @@ class TestLogView(TestBase): current_dir = os.path.dirname(os.path.abspath(__file__)) logging_config['handlers']['task']['base_log_folder'] = os.path.normpath( os.path.join(current_dir, 'test_logs')) + logging_config['handlers']['task']['filename_template'] = \ '{{ ti.dag_id }}/{{ ti.task_id }}/' \ '{{ ts | replace(":", ".") }}/{{ try_number }}.log' @@ -450,11 +459,12 @@ class TestLogView(TestBase): sys.path.append(self.settings_folder) conf.set('core', 'logging_config_class', 'airflow_local_settings.LOGGING_CONFIG') - self.app, self.appbuilder = application.create_app(testing=True) + self.app, self.appbuilder = application.create_app(session=Session, testing=True) self.app.config['WTF_CSRF_ENABLED'] = False self.client = self.app.test_client() + settings.configure_orm() + self.session = Session self.login() - self.session = Session() from airflow.www_rbac.views import dagbag dag = DAG(self.DAG_ID, start_date=self.DEFAULT_DATE) @@ -477,9 +487,9 @@ class TestLogView(TestBase): def test_get_file_task_log(self): response = self.client.get( - TestLogView.ENDPOINT, - follow_redirects=True, - ) + TestLogView.ENDPOINT, data=dict( + username='test', + password='test'), follow_redirects=True) self.assertEqual(response.status_code, 200) self.assertIn('Log by attempts', response.data.decode('utf-8')) @@ -493,7 +503,10 @@ class TestLogView(TestBase): self.TASK_ID, quote_plus(self.DEFAULT_DATE.isoformat()), 1, - json.dumps({})), follow_redirects=True) + json.dumps({})), data=dict( + username='test', + password='test'), + follow_redirects=True) self.assertIn('"message":', response.data.decode('utf-8')) self.assertIn('"metadata":', response.data.decode('utf-8')) @@ -508,7 +521,10 @@ class TestLogView(TestBase): self.client.get(url_template.format(self.DAG_ID, self.TASK_ID, quote_plus(self.DEFAULT_DATE.isoformat()), - 1), follow_redirects=True) + 1), data=dict( + username='test', + password='test'), + follow_redirects=True) self.assertIn('"message":', response.data.decode('utf-8')) self.assertIn('"metadata":', response.data.decode('utf-8')) @@ -518,7 +534,10 @@ class TestLogView(TestBase): class TestVersionView(TestBase): def test_version(self): - resp = self.client.get('version', follow_redirects=True) + resp = self.client.get('version', data=dict( + username='test', + password='test' + ), follow_redirects=True) self.check_content_in_response('Version Info', resp) @@ -582,8 +601,9 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester: Should set base date to current date (not asserted) """ response = self.test.client.get( - self.endpoint - ) + self.endpoint, data=dict( + username='test', + password='test'), follow_redirects=True) self.test.assertEqual(response.status_code, 200) data = response.data.decode('utf-8') self.test.assertIn('Base date:', data) @@ -603,7 +623,11 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester: """ response = self.test.client.get( self.endpoint + '&execution_date={}'.format( - self.runs[1].execution_date.isoformat()) + self.runs[1].execution_date.isoformat()), + data=dict( + username='test', + password='test' + ), follow_redirects=True ) self.test.assertEqual(response.status_code, 200) data = response.data.decode('utf-8') @@ -626,7 +650,11 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester: """ response = self.test.client.get( self.endpoint + '&base_date={}&num_runs=2'.format( - self.runs[1].execution_date.isoformat()) + self.runs[1].execution_date.isoformat()), + data=dict( + username='test', + password='test' + ), follow_redirects=True ) self.test.assertEqual(response.status_code, 200) data = response.data.decode('utf-8') @@ -648,7 +676,11 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester: response = self.test.client.get( self.endpoint + '&base_date={}&num_runs=42&execution_date={}'.format( self.runs[1].execution_date.isoformat(), - self.runs[0].execution_date.isoformat()) + self.runs[0].execution_date.isoformat()), + data=dict( + username='test', + password='test' + ), follow_redirects=True ) self.test.assertEqual(response.status_code, 200) data = response.data.decode('utf-8') @@ -670,7 +702,11 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester: response = self.test.client.get( self.endpoint + '&base_date={}&num_runs=5&execution_date={}'.format( self.runs[2].execution_date.isoformat(), - self.runs[3].execution_date.isoformat()) + self.runs[3].execution_date.isoformat()), + data=dict( + username='test', + password='test' + ), follow_redirects=True ) self.test.assertEqual(response.status_code, 200) data = response.data.decode('utf-8') @@ -759,5 +795,505 @@ class TestGanttView(TestBase): self.tester.test_with_base_date_and_num_runs_and_execution_date_within() +class TestDagACLView(TestBase): + """ + Test Airflow DAG acl + """ + default_date = timezone.datetime(2018, 6, 1) + run_id = "test_{}".format(models.DagRun.id_for_date(default_date)) + + @classmethod + def setUpClass(cls): + super(TestDagACLView, cls).setUpClass() + + def cleanup_dagruns(self): + DR = models.DagRun + dag_ids = ['example_bash_operator', + 'example_subdag_operator'] + (self.session + .query(DR) + .filter(DR.dag_id.in_(dag_ids)) + .filter(DR.run_id == self.run_id) + .delete(synchronize_session='fetch')) + self.session.commit() + + def prepare_dagruns(self): + dagbag = models.DagBag(include_examples=True) + self.bash_dag = dagbag.dags['example_bash_operator'] + self.sub_dag = dagbag.dags['example_subdag_operator'] + + self.bash_dagrun = self.bash_dag.create_dagrun( + run_id=self.run_id, + execution_date=self.default_date, + start_date=timezone.utcnow(), + state=State.RUNNING) + + self.sub_dagrun = self.sub_dag.create_dagrun( + run_id=self.run_id, + execution_date=self.default_date, + start_date=timezone.utcnow(), + state=State.RUNNING) + + def setUp(self): + super(TestDagACLView, self).setUp() + self.cleanup_dagruns() + self.prepare_dagruns() + self.logout() + self.appbuilder.sm.sync_roles() + self.add_permission_for_role() + + def login(self, username=None, password=None): + role_admin = self.appbuilder.sm.find_role('Admin') + tester = self.appbuilder.sm.find_user(username='test') + if not tester: + self.appbuilder.sm.add_user( + username='test', + first_name='test', + last_name='test', + email='t...@fab.org', + role=role_admin, + password='test') + + dag_acl_role = self.appbuilder.sm.add_role('dag_acl_tester') + dag_tester = self.appbuilder.sm.find_user(username='dag_tester') + if not dag_tester: + self.appbuilder.sm.add_user( + username='dag_tester', + first_name='dag_test', + last_name='dag_test', + email='dag_t...@fab.org', + role=dag_acl_role, + password='dag_test') + + # create an user without permission + dag_no_role = self.appbuilder.sm.add_role('dag_acl_faker') + dag_faker = self.appbuilder.sm.find_user(username='dag_faker') + if not dag_faker: + self.appbuilder.sm.add_user( + username='dag_faker', + first_name='dag_faker', + last_name='dag_faker', + email='dag_f...@fab.org', + role=dag_no_role, + password='dag_faker') + + # create an user with only read permission + dag_read_only_role = self.appbuilder.sm.add_role('dag_acl_read_only') + dag_read_only = self.appbuilder.sm.find_user(username='dag_read_only') + if not dag_read_only: + self.appbuilder.sm.add_user( + username='dag_read_only', + first_name='dag_read_only', + last_name='dag_read_only', + email='dag_read_o...@fab.org', + role=dag_read_only_role, + password='dag_read_only') + + # create an user that has all dag access + all_dag_role = self.appbuilder.sm.add_role('all_dag_role') + all_dag_tester = self.appbuilder.sm.find_user(username='all_dag_user') + if not all_dag_tester: + self.appbuilder.sm.add_user( + username='all_dag_user', + first_name='all_dag_user', + last_name='all_dag_user', + email='all_dag_u...@fab.org', + role=all_dag_role, + password='all_dag_user') + + user = username if username else 'dag_tester' + passwd = password if password else 'dag_test' + + return self.client.post('/login/', data=dict( + username=user, + password=passwd + )) + + def logout(self): + return self.client.get('/logout/') + + def add_permission_for_role(self): + self.logout() + self.login(username='test', + password='test') + perm_on_dag = self.appbuilder.sm.\ + find_permission_view_menu('can_dag_edit', 'example_bash_operator') + dag_tester_role = self.appbuilder.sm.find_role('dag_acl_tester') + self.appbuilder.sm.add_permission_role(dag_tester_role, perm_on_dag) + + perm_on_all_dag = self.appbuilder.sm.\ + find_permission_view_menu('can_dag_edit', 'all_dags') + all_dag_role = self.appbuilder.sm.find_role('all_dag_role') + self.appbuilder.sm.add_permission_role(all_dag_role, perm_on_all_dag) + + read_only_perm_on_dag = self.appbuilder.sm.\ + find_permission_view_menu('can_dag_read', 'example_bash_operator') + dag_read_only_role = self.appbuilder.sm.find_role('dag_acl_read_only') + self.appbuilder.sm.add_permission_role(dag_read_only_role, read_only_perm_on_dag) + + def test_permission_exist(self): + self.logout() + self.login(username='test', + password='test') + test_view_menu = self.appbuilder.sm.find_view_menu('example_bash_operator') + perms_views = self.appbuilder.sm.find_permissions_view_menu(test_view_menu) + self.assertEqual(len(perms_views), 2) + # each dag view will create one write, and one read permission + self.assertTrue(str(perms_views[0]).startswith('can dag')) + self.assertTrue(str(perms_views[1]).startswith('can dag')) + + def test_role_permission_associate(self): + self.logout() + self.login(username='test', + password='test') + test_role = self.appbuilder.sm.find_role('dag_acl_tester') + perms = set([str(perm) for perm in test_role.permissions]) + self.assertIn('can dag edit on example_bash_operator', perms) + self.assertNotIn('can dag read on example_bash_operator', perms) + + def test_index_success(self): + self.logout() + self.login() + resp = self.client.get('/', follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_index_failure(self): + self.logout() + self.login() + resp = self.client.get('/', follow_redirects=True) + # The user can only access/view example_bash_operator dag. + self.check_content_not_in_response('example_subdag_operator', resp) + + def test_index_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + resp = self.client.get('/', follow_redirects=True) + # The all dag user can access/view all dags. + self.check_content_in_response('example_subdag_operator', resp) + self.check_content_in_response('example_bash_operator', resp) + + def test_dag_stats_success(self): + self.logout() + self.login() + resp = self.client.get('dag_stats', follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_dag_stats_failure(self): + self.logout() + self.login() + resp = self.client.get('dag_stats', follow_redirects=True) + self.check_content_not_in_response('example_subdag_operator', resp) + + def test_dag_stats_success_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + resp = self.client.get('dag_stats', follow_redirects=True) + self.check_content_in_response('example_subdag_operator', resp) + self.check_content_in_response('example_bash_operator', resp) + + def test_task_stats_success(self): + self.logout() + self.login() + resp = self.client.get('task_stats', follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_task_stats_failure(self): + self.logout() + self.login() + resp = self.client.get('task_stats', follow_redirects=True) + self.check_content_not_in_response('example_subdag_operator', resp) + + def test_task_stats_success_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + resp = self.client.get('task_stats', follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + self.check_content_in_response('example_subdag_operator', resp) + + def test_code_success(self): + self.logout() + self.login() + url = 'code?dag_id=example_bash_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_code_failure(self): + self.logout() + self.login(username='dag_faker', + password='dag_faker') + url = 'code?dag_id=example_bash_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('example_bash_operator', resp) + + def test_code_success_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + url = 'code?dag_id=example_bash_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + url = 'code?dag_id=example_subdag_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_subdag_operator', resp) + + def test_dag_details_success(self): + self.logout() + self.login() + url = 'dag_details?dag_id=example_bash_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('DAG details', resp) + + def test_dag_details_failure(self): + self.logout() + self.login(username='dag_faker', + password='dag_faker') + url = 'dag_details?dag_id=example_bash_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('DAG details', resp) + + def test_dag_details_success_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + url = 'dag_details?dag_id=example_bash_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + url = 'dag_details?dag_id=example_subdag_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_subdag_operator', resp) + + def test_pickle_info_success(self): + self.logout() + self.login() + url = 'pickle_info?dag_id=example_bash_operator' + resp = self.client.get(url, follow_redirects=True) + self.assertEqual(resp.status_code, 200) + + def test_rendered_success(self): + self.logout() + self.login() + url = ('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('Rendered Template', resp) + + def test_rendered_failure(self): + self.logout() + self.login(username='dag_faker', + password='dag_faker') + url = ('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('Rendered Template', resp) + + def test_rendered_success_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + url = ('rendered?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('Rendered Template', resp) + + def test_task_success(self): + self.logout() + self.login() + url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('Task Instance Details', resp) + + def test_task_failure(self): + self.logout() + self.login(username='dag_faker', + password='dag_faker') + url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('Task Instance Details', resp) + + def test_task_success_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('Task Instance Details', resp) + + def test_xcom_success(self): + self.logout() + self.login() + url = ('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('XCom', resp) + + def test_xcom_failure(self): + self.logout() + self.login(username='dag_faker', + password='dag_faker') + url = ('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('XCom', resp) + + def test_xcom_success_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + url = ('xcom?task_id=runme_0&dag_id=example_bash_operator&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('XCom', resp) + + def test_run_success(self): + self.logout() + self.login() + url = ('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&' + 'ignore_ti_state=true&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url) + self.check_content_in_response('', resp, resp_code=302) + + def test_run_success_for_all_dag_user(self): + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + url = ('run?task_id=runme_0&dag_id=example_bash_operator&ignore_all_deps=false&' + 'ignore_ti_state=true&execution_date={}' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url) + self.check_content_in_response('', resp, resp_code=302) + + def test_blocked_success(self): + url = 'blocked' + self.logout() + self.login() + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_blocked_success_for_all_dag_user(self): + url = 'blocked' + self.logout() + self.login(username='all_dag_user', + password='all_dag_user') + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + self.check_content_in_response('example_subdag_operator', resp) + + def test_failed_success(self): + self.logout() + self.login() + url = ('failed?task_id=run_this_last&dag_id=example_bash_operator&' + 'execution_date={}&upstream=false&downstream=false&future=false&past=false' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url) + self.check_content_in_response('Redirecting', resp, 302) + + def test_duration_success(self): + url = 'duration?days=30&dag_id=example_bash_operator' + self.logout() + self.login() + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_duration_failure(self): + url = 'duration?days=30&dag_id=example_bash_operator' + self.logout() + # login as an user without permissions + self.login(username='dag_faker', + password='dag_faker') + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('example_bash_operator', resp) + + def test_tries_success(self): + url = 'tries?days=30&dag_id=example_bash_operator' + self.logout() + self.login() + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_tries_failure(self): + url = 'tries?days=30&dag_id=example_bash_operator' + self.logout() + # login as an user without permissions + self.login(username='dag_faker', + password='dag_faker') + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('example_bash_operator', resp) + + def test_landing_times_success(self): + url = 'landing_times?days=30&dag_id=example_bash_operator' + self.logout() + self.login() + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_landing_times_failure(self): + url = 'landing_times?days=30&dag_id=example_bash_operator' + self.logout() + self.login(username='dag_faker', + password='dag_faker') + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('example_bash_operator', resp) + + def test_paused_success(self): + # post request failure won't test + url = 'paused?dag_id=example_bash_operator&is_paused=false' + self.logout() + self.login() + resp = self.client.post(url, follow_redirects=True) + self.check_content_in_response('OK', resp) + + def test_refresh_success(self): + self.logout() + self.login() + resp = self.client.get('refresh?dag_id=example_bash_operator') + self.check_content_in_response('', resp, resp_code=302) + + def test_gantt_success(self): + url = 'gantt?dag_id=example_bash_operator' + self.logout() + self.login() + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('example_bash_operator', resp) + + def test_gantt_failure(self): + url = 'gantt?dag_id=example_bash_operator' + self.logout() + self.login(username='dag_faker', + password='dag_faker') + resp = self.client.get(url, follow_redirects=True) + self.check_content_not_in_response('example_bash_operator', resp) + + def test_success_fail_for_read_only_role(self): + # succcess endpoint need can_dag_edit, which read only role can not access + self.logout() + self.login(username='dag_read_only', + password='dag_read_only') + + url = ('success?task_id=run_this_last&dag_id=example_bash_operator&' + 'execution_date={}&upstream=false&downstream=false&future=false&past=false' + .format(self.percent_encode(self.default_date))) + resp = self.client.get(url) + self.check_content_not_in_response('Wait a minute', resp, resp_code=302) + + def test_tree_success_for_read_only_role(self): + # tree view only allows can_dag_read, which read only role could access + self.logout() + self.login(username='dag_read_only', + password='dag_read_only') + + url = 'tree?dag_id=example_bash_operator' + resp = self.client.get(url, follow_redirects=True) + self.check_content_in_response('runme_1', resp) + + if __name__ == '__main__': unittest.main()