This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 101d59c4b8 Remove Run task action from UI (#29706) 101d59c4b8 is described below commit 101d59c4b88ab979d305b8d96f612c27c8a44aa8 Author: Brent Bovenzi <br...@astronomer.io> AuthorDate: Thu Feb 23 17:21:59 2023 -0500 Remove Run task action from UI (#29706) * Remove Run task action from UI * remove /run endpoint * remove tests for Airflow.run * remove extra imports --- airflow/www/static/js/api/index.ts | 2 - airflow/www/static/js/api/useRunTask.ts | 75 ------------ .../dag/details/taskInstance/taskActions/Run.tsx | 97 ---------------- .../dag/details/taskInstance/taskActions/index.tsx | 7 -- airflow/www/templates/airflow/dag.html | 34 ------ airflow/www/views.py | 70 +---------- tests/www/views/test_views_acl.py | 13 --- tests/www/views/test_views_tasks.py | 128 --------------------- 8 files changed, 1 insertion(+), 425 deletions(-) diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index 5df46587e0..d055f737d9 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -24,7 +24,6 @@ import useClearRun from './useClearRun'; import useQueueRun from './useQueueRun'; import useMarkFailedRun from './useMarkFailedRun'; import useMarkSuccessRun from './useMarkSuccessRun'; -import useRunTask from './useRunTask'; import useClearTask from './useClearTask'; import useMarkFailedTask from './useMarkFailedTask'; import useMarkSuccessTask from './useMarkSuccessTask'; @@ -63,7 +62,6 @@ export { useMarkSuccessRun, useMarkSuccessTask, useQueueRun, - useRunTask, useSetDagRunNote, useSetTaskInstanceNote, useTaskInstance, diff --git a/airflow/www/static/js/api/useRunTask.ts b/airflow/www/static/js/api/useRunTask.ts deleted file mode 100644 index 98b3cc873c..0000000000 --- a/airflow/www/static/js/api/useRunTask.ts +++ /dev/null @@ -1,75 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import axios from 'axios'; -import { useMutation, useQueryClient } from 'react-query'; -import URLSearchParamsWrapper from 'src/utils/URLSearchParamWrapper'; -import { getMetaValue } from '../utils'; -import { useAutoRefresh } from '../context/autorefresh'; -import useErrorToast from '../utils/useErrorToast'; - -const csrfToken = getMetaValue('csrf_token'); -const runUrl = getMetaValue('run_url'); - -export default function useRunTask(dagId: string, runId: string, taskId: string) { - const queryClient = useQueryClient(); - const errorToast = useErrorToast(); - const { startRefresh } = useAutoRefresh(); - return useMutation( - ['runTask', dagId, runId, taskId], - async ({ - ignoreAllDeps, - ignoreTaskState, - ignoreTaskDeps, - mapIndexes, - }:{ - ignoreAllDeps: boolean, - ignoreTaskState: boolean, - ignoreTaskDeps: boolean, - mapIndexes: number[], - }) => Promise.all( - (mapIndexes.length ? mapIndexes : [-1]).map((mi) => { - const params = new URLSearchParamsWrapper({ - csrf_token: csrfToken, - dag_id: dagId, - dag_run_id: runId, - task_id: taskId, - ignore_all_deps: ignoreAllDeps, - ignore_task_deps: ignoreTaskDeps, - ignore_ti_state: ignoreTaskState, - map_index: mi, - }).toString(); - - return axios.post(runUrl, params, { - headers: { - 'Content-Type': 'application/x-www-form-urlencoded', - }, - }); - }), - ), - { - onSuccess: () => { - queryClient.invalidateQueries('gridData'); - queryClient.invalidateQueries(['mappedInstances', dagId, runId, taskId]); - startRefresh(); - }, - onError: (error: Error) => errorToast({ error }), - }, - ); -} diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx deleted file mode 100644 index 62d08a77ef..0000000000 --- a/airflow/www/static/js/dag/details/taskInstance/taskActions/Run.tsx +++ /dev/null @@ -1,97 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import React, { useState } from 'react'; -import { - Button, - Flex, - ButtonGroup, -} from '@chakra-ui/react'; - -import { useRunTask } from 'src/api'; -import { getMetaValue } from 'src/utils'; - -const canEdit = getMetaValue('can_edit') === 'True'; - -interface Props { - dagId: string; - runId: string; - taskId: string; - mapIndexes: number[]; -} - -const Run = ({ - dagId, - runId, - taskId, - mapIndexes, -}: Props) => { - const [ignoreAllDeps, setIgnoreAllDeps] = useState(false); - const onToggleAllDeps = () => setIgnoreAllDeps(!ignoreAllDeps); - - const [ignoreTaskState, setIgnoreTaskState] = useState(false); - const onToggleTaskState = () => setIgnoreTaskState(!ignoreTaskState); - - const [ignoreTaskDeps, setIgnoreTaskDeps] = useState(false); - const onToggleTaskDeps = () => setIgnoreTaskDeps(!ignoreTaskDeps); - - const { mutate: onRun, isLoading } = useRunTask(dagId, runId, taskId); - - const onClick = () => { - onRun({ - ignoreAllDeps, - ignoreTaskState, - ignoreTaskDeps, - mapIndexes, - }); - }; - - return ( - <Flex justifyContent="space-between" width="100%"> - <ButtonGroup isAttached variant="outline" isDisabled={!canEdit}> - <Button - bg={ignoreAllDeps ? 'gray.100' : undefined} - onClick={onToggleAllDeps} - title="Ignores all non-critical dependencies, including task state and task_deps" - > - Ignore All Deps - </Button> - <Button - bg={ignoreTaskState ? 'gray.100' : undefined} - onClick={onToggleTaskState} - title="Ignore previous success/failure" - > - Ignore Task State - </Button> - <Button - bg={ignoreTaskDeps ? 'gray.100' : undefined} - onClick={onToggleTaskDeps} - title="Disregard the task-specific dependencies, e.g. status of upstream task instances and depends_on_past" - > - Ignore Task Deps - </Button> - </ButtonGroup> - <Button colorScheme="blue" onClick={onClick} isLoading={isLoading} isDisabled={!canEdit}> - Run - </Button> - </Flex> - ); -}; - -export default Run; diff --git a/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx b/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx index 72f3f238ab..55a1a2529d 100644 --- a/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/taskActions/index.tsx @@ -27,7 +27,6 @@ import { } from '@chakra-ui/react'; import type { CommonActionProps } from './types'; -import RunAction from './Run'; import ClearAction from './Clear'; import MarkFailedAction from './MarkFailed'; import MarkSuccessAction from './MarkSuccess'; @@ -54,12 +53,6 @@ const TaskActions = ({ /> ) : ( <VStack justifyContent="center" divider={<StackDivider my={3} />}> - <RunAction - runId={runId} - taskId={taskId} - dagId={dagId} - mapIndexes={mapIndexes} - /> <ClearAction runId={runId} taskId={taskId} diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index f1f029e516..2ed843bb78 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -56,7 +56,6 @@ <meta name="confirm_url" content="{{ url_for('Airflow.confirm') }}"> <meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}"> <meta name="next_run_datasets_url" content="{{ url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}"> - <meta name="run_url" content="{{ url_for('Airflow.run') }}"> <meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id) }}"> <meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}"> <meta name="grid_url_no_root" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id, num_runs=num_runs_arg, base_date=base_date_arg) }}"> @@ -332,39 +331,6 @@ </div> {% endif %} <h4 id="task_actions">Task Actions</h4> - <form method="POST" data-action="{{ url_for('Airflow.run') }}" id="run_action"> - <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> - <input type="hidden" name="dag_id" value="{{ dag.dag_id }}"> - <input type="hidden" name="task_id"> - <input type="hidden" name="dag_run_id"> - <input type="hidden" name="map_index"> - <input type="hidden" name="origin" value="{{ request.base_url }}"> - <div class="row"> - <span class="btn-group col-xs-12 col-sm-9 task-instance-modal-column" data-toggle="buttons"> - <label - class="btn btn-default" - title="Ignores all non-critical dependencies, including task state and task_deps"> - <input type="checkbox" value="true" name="ignore_all_deps" autocomplete="off"> - Ignore All Deps</label> - <label class="btn btn-default" - title="Ignore previous success/failure"> - <input type="checkbox" value="true" name="ignore_ti_state" autocomplete="off"> - Ignore Task State - </label> - <label class="btn btn-default" - title="Disregard the task-specific dependencies, e.g. status of upstream task instances and depends_on_past"> - <input type="checkbox" value="true" name="ignore_task_deps" autocomplete="off"> - Ignore Task Deps - </label> - </span> - <span class="col-xs-12 col-sm-3 task-instance-modal-column"> - <button type="submit" id="btn_run" class="btn btn-primary btn-block" title="Runs a single task instance"> - Run - </button> - </span> - </div> - <hr style="margin-bottom: 8px;"> - </form> <form method="POST" data-action="{{ url_for('Airflow.clear') }}" id="clear_action"> <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> <input type="hidden" name="dag_id" value="{{ dag.dag_id }}"> diff --git a/airflow/www/views.py b/airflow/www/views.py index 5335d7bd37..6a6a9fb02b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -89,7 +89,6 @@ from airflow.compat.functools import cached_property from airflow.configuration import AIRFLOW_CONFIG, conf from airflow.datasets import Dataset from airflow.exceptions import AirflowException, ParamValidationError, RemovedInAirflow3Warning -from airflow.executors.executor_loader import ExecutorLoader from airflow.jobs.base_job import BaseJob from airflow.jobs.scheduler_job import SchedulerJob from airflow.jobs.triggerer_job import TriggererJob @@ -106,7 +105,7 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceNote from airflow.providers_manager import ProvidersManager from airflow.security import permissions from airflow.ti_deps.dep_context import DepContext -from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS +from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS from airflow.timetables._cron import CronMixin from airflow.timetables.base import DataInterval, TimeRestriction from airflow.utils import json as utils_json, timezone, yaml @@ -1836,73 +1835,6 @@ class Airflow(AirflowBaseView): title=title, ) - @expose("/run", methods=["POST"]) - @auth.has_access( - [ - (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), - (permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_INSTANCE), - ] - ) - @action_logging - @provide_session - def run(self, session=None): - """Runs Task Instance.""" - dag_id = request.form.get("dag_id") - task_id = request.form.get("task_id") - dag_run_id = request.form.get("dag_run_id") - map_index = request.args.get("map_index", -1, type=int) - origin = get_safe_url(request.form.get("origin")) - dag = get_airflow_app().dag_bag.get_dag(dag_id) - if not dag: - return redirect_or_json(origin, "DAG not found", "error", 404) - task = dag.get_task(task_id) - - ignore_all_deps = request.form.get("ignore_all_deps") == "true" - ignore_task_deps = request.form.get("ignore_task_deps") == "true" - ignore_ti_state = request.form.get("ignore_ti_state") == "true" - - executor = ExecutorLoader.get_default_executor() - - if not executor.supports_ad_hoc_ti_run: - msg = f"{executor.__class__.__name__} does not support ad hoc task runs" - return redirect_or_json(origin, msg, "error", 400) - dag_run = dag.get_dagrun(run_id=dag_run_id, session=session) - if not dag_run: - return redirect_or_json(origin, "DAG run not found", "error", 404) - ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session) - if not ti: - msg = "Could not queue task instance for execution, task instance is missing" - return redirect_or_json(origin, msg, "error", 400) - - ti.refresh_from_task(task) - - # Make sure the task instance can be run - dep_context = DepContext( - deps=RUNNING_DEPS, - ignore_all_deps=ignore_all_deps, - ignore_task_deps=ignore_task_deps, - ignore_ti_state=ignore_ti_state, - ) - failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context)) - if failed_deps: - failed_deps_str = ", ".join(f"{dep.dep_name}: {dep.reason}" for dep in failed_deps) - msg = f"Could not queue task instance for execution, dependencies not met: {failed_deps_str}" - return redirect_or_json(origin, msg, "error", 400) - - executor.job_id = None - executor.start() - executor.queue_task_instance( - ti, - ignore_all_deps=ignore_all_deps, - ignore_task_deps=ignore_task_deps, - ignore_ti_state=ignore_ti_state, - ) - executor.heartbeat() - ti.queued_dttm = timezone.utcnow() - session.merge(ti) - msg = f"Sent {ti} to the message queue, it should start any moment now." - return redirect_or_json(origin, msg) - @expose("/delete", methods=["POST"]) @auth.has_access( [ diff --git a/tests/www/views/test_views_acl.py b/tests/www/views/test_views_acl.py index 6b197f3bf8..d087c311e3 100644 --- a/tests/www/views/test_views_acl.py +++ b/tests/www/views/test_views_acl.py @@ -636,19 +636,6 @@ def test_failure(dag_faker_client, url, unexpected_content): check_content_not_in_response(unexpected_content, resp) -@pytest.mark.parametrize("client", ["dag_test_client", "all_dag_user_client"]) -def test_run_success(request, client): - form = dict( - task_id="runme_0", - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="true", - execution_date=DEFAULT_DATE, - ) - resp = request.getfixturevalue(client).post("run", data=form) - assert resp.status_code == 302 - - def test_blocked_success(client_all_dags_dagruns): resp = client_all_dags_dagruns.post("blocked", follow_redirects=True) check_content_in_response("example_bash_operator", resp) diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 5add70b261..6f7d70f19d 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -30,12 +30,10 @@ import time_machine from airflow import settings from airflow.exceptions import AirflowException from airflow.executors.celery_executor import CeleryExecutor -from airflow.executors.local_executor import LocalExecutor from airflow.models import DAG, DagBag, DagModel, TaskFail, TaskInstance, TaskReschedule from airflow.models.dagcode import DagCode from airflow.operators.bash import BashOperator from airflow.security import permissions -from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES from airflow.utils import timezone from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session @@ -485,24 +483,12 @@ def test_code_from_db_all_example_dags(admin_client): ), "example_bash_operator", ), - ( - "run", - dict( - task_id="runme_0", - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="true", - dag_run_id=DEFAULT_DAGRUN, - ), - "", - ), ], ids=[ "paused", "failed-flash-hint", "success-flash-hint", "clear", - "run", ], ) def test_views_post(admin_client, url, data, content): @@ -533,120 +519,6 @@ class _ForceHeartbeatCeleryExecutor(CeleryExecutor): return True -@pytest.mark.parametrize("state", RUNNABLE_STATES) -@unittest.mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", - return_value=_ForceHeartbeatCeleryExecutor(), -) -def test_run_with_runnable_states(_, admin_client, session, state): - task_id = "runme_0" - session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( - {"state": state, "end_date": timezone.utcnow()} - ) - session.commit() - - form = dict( - task_id=task_id, - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="false", - dag_run_id=DEFAULT_DAGRUN, - origin="/home", - ) - resp = admin_client.post("run", data=form, follow_redirects=True) - check_content_in_response("", resp) - - msg = f"Task is in the '{state}' state." - assert not re.search(msg, resp.get_data(as_text=True)) - - -@unittest.mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", - return_value=_ForceHeartbeatCeleryExecutor(), -) -def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session, time_machine): - task_id = "runme_0" - session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( - {"state": State.SCHEDULED, "queued_dttm": None} - ) - session.commit() - - assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).all() == [(None,)] - - form = dict( - task_id=task_id, - dag_id="example_bash_operator", - ignore_all_deps="true", - dag_run_id=DEFAULT_DAGRUN, - origin="/home", - ) - now = timezone.utcnow() - - time_machine.move_to(now, tick=False) - resp = admin_client.post("run", data=form, follow_redirects=True) - - assert resp.status_code == 200 - assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).scalar() == now - - -@pytest.mark.parametrize("state", QUEUEABLE_STATES) -@unittest.mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", - return_value=CeleryExecutor(), -) -def test_run_with_not_runnable_states(_, admin_client, session, state): - assert state not in RUNNABLE_STATES - - task_id = "runme_0" - session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( - {"state": state, "end_date": timezone.utcnow()} - ) - session.commit() - - form = dict( - task_id=task_id, - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="false", - dag_run_id=DEFAULT_DAGRUN, - origin="/home", - ) - resp = admin_client.post("run", data=form, follow_redirects=True) - check_content_in_response("", resp) - - msg = f"Task is in the '{state}' state." - assert re.search(msg, resp.get_data(as_text=True)) - - -@pytest.mark.parametrize("state", QUEUEABLE_STATES) -@unittest.mock.patch( - "airflow.executors.executor_loader.ExecutorLoader.get_default_executor", - return_value=LocalExecutor(), -) -def test_run_with_the_unsupported_executor(_, admin_client, session, state): - assert state not in RUNNABLE_STATES - - task_id = "runme_0" - session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update( - {"state": state, "end_date": timezone.utcnow()} - ) - session.commit() - - form = dict( - task_id=task_id, - dag_id="example_bash_operator", - ignore_all_deps="false", - ignore_ti_state="false", - dag_run_id=DEFAULT_DAGRUN, - origin="/home", - ) - resp = admin_client.post("run", data=form, follow_redirects=True) - check_content_in_response("", resp) - - msg = "LocalExecutor does not support ad hoc task runs" - assert re.search(msg, resp.get_data(as_text=True)) - - @pytest.fixture() def new_id_example_bash_operator(): dag_id = "example_bash_operator"