This is an automated email from the ASF dual-hosted git repository. msumit pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 7432c4d Fix TI success/failure links (#16233) 7432c4d is described below commit 7432c4d7ea17ad95cc47c6e772c221d5d141f5e0 Author: Sumit Maheshwari <msu...@users.noreply.github.com> AuthorDate: Fri Jun 11 14:12:55 2021 +0530 Fix TI success/failure links (#16233) fixes issue #15234. As of now, TI success & failure endpoints are POST only and behave differently as per the "confirmed" flag. They either render a confirmation page or updates the TI states on the basis of that flag, something which is not a great design. Also, as these endpoints are POST only, they throw a 404 error when someone clicks on the link received via email. To fix the issue, extracting the rendering functionalities into a diff endpoint "/confirm" & keeping these endpoints as pure POST endpoints. --- airflow/models/taskinstance.py | 3 +- airflow/utils/strings.py | 2 +- airflow/www/templates/airflow/confirm.html | 8 +- airflow/www/templates/airflow/dag.html | 8 +- airflow/www/views.py | 168 +++++++++++++++++++---------- tests/www/views/test_views.py | 1 - tests/www/views/test_views_acl.py | 4 +- tests/www/views/test_views_tasks.py | 60 +++++------ 8 files changed, 153 insertions(+), 101 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index b4e644e..cf066b3 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -533,12 +533,13 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904 iso = quote(self.execution_date.isoformat()) base_url = conf.get('webserver', 'BASE_URL') return base_url + ( - "/success" + "/confirm" f"?task_id={self.task_id}" f"&dag_id={self.dag_id}" f"&execution_date={iso}" "&upstream=false" "&downstream=false" + "&state=success" ) @provide_session diff --git a/airflow/utils/strings.py b/airflow/utils/strings.py index 8d73914..c1823ff 100644 --- a/airflow/utils/strings.py +++ b/airflow/utils/strings.py @@ -27,4 +27,4 @@ def get_random_string(length=8, choices=string.ascii_letters + string.digits): def to_boolean(astring): """Convert a string to a boolean""" - return astring.lower() in ['true', 't', 'y', 'yes', '1'] + return False if astring is None else astring.lower() in ['true', 't', 'y', 'yes', '1'] diff --git a/airflow/www/templates/airflow/confirm.html b/airflow/www/templates/airflow/confirm.html index ccf49cb..3bbc908 100644 --- a/airflow/www/templates/airflow/confirm.html +++ b/airflow/www/templates/airflow/confirm.html @@ -28,10 +28,14 @@ <pre><code>{{ details }}</code></pre> {% endif %} </div> - <form method="POST"> + {% if endpoint %} + <form method="POST" action="{{ endpoint }}"> + {% else %} + <form method="POST"> + {% endif %} <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/> <input type="hidden" name="confirmed" value="true"> - {% for name,val in request.form.items() if name != "csrf_token" %} + {% for name,val in request.values.items() if name != "csrf_token" %} <input type="hidden" name="{{ name }}" value="{{ val }}"> {% endfor %} <button type="submit" class="btn btn-primary">OK!</button> diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 171b4a9..5bab88d 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -284,12 +284,12 @@ </div> </form> <hr style="margin-bottom: 8px;"> - <form method="POST" data-action="{{ url_for('Airflow.failed') }}"> - <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> + <form method="GET" data-action="{{ url_for('Airflow.confirm') }}"> <input type="hidden" name="dag_id" value="{{ dag.dag_id }}"> <input type="hidden" name="task_id"> <input type="hidden" name="execution_date"> <input type="hidden" name="origin" value="{{ request.base_url }}"> + <input type="hidden" name="state" value="failed"> <div class="row"> <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons"> <label class="btn btn-default"> @@ -317,12 +317,12 @@ </div> </form> <hr style="margin-bottom: 8px;"> - <form method="POST" data-action="{{ url_for('Airflow.success') }}"> - <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> + <form method="GET" data-action="{{ url_for('Airflow.confirm') }}"> <input type="hidden" name="dag_id" value="{{ dag.dag_id }}"> <input type="hidden" name="task_id"> <input type="hidden" name="execution_date"> <input type="hidden" name="origin" value="{{ request.base_url }}"> + <input type="hidden" name="state" value="success"> <div class="row"> <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons"> <label class="btn btn-default"> diff --git a/airflow/www/views.py b/airflow/www/views.py index e7eeb3c..477b87d 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -111,6 +111,7 @@ from airflow.utils.log import secrets_masker from airflow.utils.log.log_reader import TaskLogReader from airflow.utils.session import create_session, provide_session from airflow.utils.state import State +from airflow.utils.strings import to_boolean from airflow.version import version from airflow.www import auth, utils as wwwutils from airflow.www.decorators import action_logging, gzipped @@ -1578,6 +1579,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m response = self.render_template( 'airflow/confirm.html', + endpoint=None, message="Here's the list of task instances you are about to clear:", details=details, ) @@ -1794,7 +1796,6 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m task_id, origin, execution_date, - confirmed, upstream, downstream, future, @@ -1807,54 +1808,104 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m latest_execution_date = dag.get_latest_execution_date() if not latest_execution_date: - flash(f"Cannot make {state}, seem that dag {dag_id} has never run", "error") + flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error") return redirect(origin) execution_date = timezone.parse(execution_date) from airflow.api.common.experimental.mark_tasks import set_state - if confirmed: - with create_session() as session: - altered = set_state( - tasks=[task], - execution_date=execution_date, - upstream=upstream, - downstream=downstream, - future=future, - past=past, - state=state, - commit=True, - session=session, - ) + with create_session() as session: + altered = set_state( + tasks=[task], + execution_date=execution_date, + upstream=upstream, + downstream=downstream, + future=future, + past=past, + state=state, + commit=True, + session=session, + ) - # Clear downstream tasks that are in failed/upstream_failed state to resume them. - # Flush the session so that the tasks marked success are reflected in the db. - session.flush() - subdag = dag.partial_subset( - task_ids_or_regex={task_id}, - include_downstream=True, - include_upstream=False, - ) + # Clear downstream tasks that are in failed/upstream_failed state to resume them. + # Flush the session so that the tasks marked success are reflected in the db. + session.flush() + subdag = dag.partial_subset( + task_ids_or_regex={task_id}, + include_downstream=True, + include_upstream=False, + ) - end_date = execution_date if not future else None - start_date = execution_date if not past else None - - subdag.clear( - start_date=start_date, - end_date=end_date, - include_subdags=True, - include_parentdag=True, - only_failed=True, - session=session, - # Exclude the task itself from being cleared - exclude_task_ids={task_id}, - ) + end_date = execution_date if not future else None + start_date = execution_date if not past else None - session.commit() + subdag.clear( + start_date=start_date, + end_date=end_date, + include_subdags=True, + include_parentdag=True, + only_failed=True, + session=session, + # Exclude the task itself from being cleared + exclude_task_ids={task_id}, + ) - flash(f"Marked {state} on {len(altered)} task instances") - return redirect(origin) + session.commit() + + flash(f"Marked {state} on {len(altered)} task instances") + return redirect(origin) + + @expose('/confirm', methods=['GET']) + @auth.has_access( + [ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE), + ] + ) + @action_logging + def confirm(self): + """Show confirmation page for marking tasks as success or failed.""" + args = request.args + dag_id = args.get('dag_id') + task_id = args.get('task_id') + execution_date = args.get('execution_date') + state = args.get('state') + + upstream = to_boolean(args.get('failed_upstream')) + downstream = to_boolean(args.get('failed_downstream')) + future = to_boolean(args.get('failed_future')) + past = to_boolean(args.get('failed_past')) + + try: + dag = current_app.dag_bag.get_dag(dag_id) + except airflow.exceptions.SerializedDagNotFound: + flash(f'DAG {dag_id} not found', "error") + return redirect(request.referrer or url_for('Airflow.index')) + + try: + task = dag.get_task(task_id) + except airflow.exceptions.TaskNotFound: + flash(f"Task {task_id} not found", "error") + return redirect(request.referrer or url_for('Airflow.index')) + + task.dag = dag + + if state not in ( + 'success', + 'failed', + ): + flash(f"Invalid state {state}, must be either 'success' or 'failed'", "error") + return redirect(request.referrer or url_for('Airflow.index')) + + latest_execution_date = dag.get_latest_execution_date() + if not latest_execution_date: + flash(f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run", "error") + return redirect(request.referrer or url_for('Airflow.index')) + + execution_date = timezone.parse(execution_date) + + from airflow.api.common.experimental.mark_tasks import set_state to_be_altered = set_state( tasks=[task], @@ -1871,6 +1922,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m response = self.render_template( "airflow/confirm.html", + endpoint=url_for(f'Airflow.{state}'), message=f"Here's the list of task instances you are about to mark as {state}:", details=details, ) @@ -1887,23 +1939,22 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m @action_logging def failed(self): """Mark task as failed.""" - dag_id = request.form.get('dag_id') - task_id = request.form.get('task_id') - origin = get_safe_url(request.form.get('origin')) - execution_date = request.form.get('execution_date') + args = request.form + dag_id = args.get('dag_id') + task_id = args.get('task_id') + origin = get_safe_url(args.get('origin')) + execution_date = args.get('execution_date') - confirmed = request.form.get('confirmed') == "true" - upstream = request.form.get('failed_upstream') == "true" - downstream = request.form.get('failed_downstream') == "true" - future = request.form.get('failed_future') == "true" - past = request.form.get('failed_past') == "true" + upstream = to_boolean(args.get('failed_upstream')) + downstream = to_boolean(args.get('failed_downstream')) + future = to_boolean(args.get('failed_future')) + past = to_boolean(args.get('failed_past')) return self._mark_task_instance_state( dag_id, task_id, origin, execution_date, - confirmed, upstream, downstream, future, @@ -1921,23 +1972,22 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m @action_logging def success(self): """Mark task as success.""" - dag_id = request.form.get('dag_id') - task_id = request.form.get('task_id') - origin = get_safe_url(request.form.get('origin')) - execution_date = request.form.get('execution_date') + args = request.form + dag_id = args.get('dag_id') + task_id = args.get('task_id') + origin = get_safe_url(args.get('origin')) + execution_date = args.get('execution_date') - confirmed = request.form.get('confirmed') == "true" - upstream = request.form.get('success_upstream') == "true" - downstream = request.form.get('success_downstream') == "true" - future = request.form.get('success_future') == "true" - past = request.form.get('success_past') == "true" + upstream = to_boolean(args.get('failed_upstream')) + downstream = to_boolean(args.get('failed_downstream')) + future = to_boolean(args.get('failed_future')) + past = to_boolean(args.get('failed_past')) return self._mark_task_instance_state( dag_id, task_id, origin, execution_date, - confirmed, upstream, downstream, future, diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py index 4f7f5d0..e698de8 100644 --- a/tests/www/views/test_views.py +++ b/tests/www/views/test_views.py @@ -223,7 +223,6 @@ def test_mark_task_instance_state(test_app): task_id=task_1.task_id, origin="", execution_date=start_date.isoformat(), - confirmed=True, upstream=False, downstream=False, future=False, diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py index 5964081..e82e3e8 100644 --- a/tests/www/views/test_views_acl.py +++ b/tests/www/views/test_views_acl.py @@ -690,8 +690,8 @@ def test_failed_success(client_all_dags_edit_tis): future="false", past="false", ) - resp = client_all_dags_edit_tis.post('failed', data=form) - check_content_in_response('example_bash_operator', resp) + resp = client_all_dags_edit_tis.post('failed', data=form, follow_redirects=True) + check_content_in_response('Marked failed on 1 task instances', resp) @pytest.mark.parametrize( diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 1250085..dc17c39 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -177,6 +177,34 @@ def init_dagruns(app, reset_dagruns): # pylint: disable=unused-argument ["example_bash_operator"], id="existing-dagbag-dag-details", ), + pytest.param( + f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=success' + f'&execution_date={DEFAULT_VAL}', + ['Wait a minute.'], + id="confirm-success", + ), + pytest.param( + f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=failed&execution_date={DEFAULT_VAL}', + ['Wait a minute.'], + id="confirm-failed", + ), + pytest.param( + f'confirm?task_id=runme_0&dag_id=invalid_dag&state=failed&execution_date={DEFAULT_VAL}', + ['DAG invalid_dag not found'], + id="confirm-failed", + ), + pytest.param( + f'confirm?task_id=invalid_task&dag_id=example_bash_operator&state=failed' + f'&execution_date={DEFAULT_VAL}', + ['Task invalid_task not found'], + id="confirm-failed", + ), + pytest.param( + f'confirm?task_id=runme_0&dag_id=example_bash_operator&state=invalid' + f'&execution_date={DEFAULT_VAL}', + ["Invalid state invalid, must be either 'success' or 'failed'"], + id="confirm-invalid", + ), ], ) def test_views_get(admin_client, url, contents): @@ -332,20 +360,6 @@ def test_code_from_db_all_example_dags(admin_client): downstream="false", future="false", past="false", - ), - "Wait a minute", - ), - ( - "failed", - dict( - task_id="run_this_last", - dag_id="example_bash_operator", - execution_date=DEFAULT_DATE, - confirmed="true", - upstream="false", - downstream="false", - future="false", - past="false", origin="/graph?dag_id=example_bash_operator", ), "Marked failed on 1 task instances", @@ -360,20 +374,6 @@ def test_code_from_db_all_example_dags(admin_client): downstream="false", future="false", past="false", - ), - 'Wait a minute', - ), - ( - "success", - dict( - task_id="run_this_last", - dag_id="example_bash_operator", - execution_date=DEFAULT_DATE, - confirmed="true", - upstream="false", - downstream="false", - future="false", - past="false", origin="/graph?dag_id=example_bash_operator", ), "Marked success on 1 task instances", @@ -406,9 +406,7 @@ def test_code_from_db_all_example_dags(admin_client): ], ids=[ "paused", - "failed", "failed-flash-hint", - "success", "success-flash-hint", "clear", "run", @@ -434,7 +432,7 @@ def test_dag_never_run(admin_client, url): ) clear_db_runs() resp = admin_client.post(url, data=form, follow_redirects=True) - check_content_in_response(f"Cannot make {url}, seem that dag {dag_id} has never run", resp) + check_content_in_response(f"Cannot mark tasks as {url}, seem that dag {dag_id} has never run", resp) class _ForceHeartbeatCeleryExecutor(CeleryExecutor):