This is an automated email from the ASF dual-hosted git repository. bbovenzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 35b616333c Add TaskFail entries to Gantt chart (#37918) 35b616333c is described below commit 35b616333ce7358ae4ba62a03a032debdea2a19a Author: Brent Bovenzi <br...@astronomer.io> AuthorDate: Thu Mar 7 17:09:28 2024 -0500 Add TaskFail entries to Gantt chart (#37918) * Add TaskFail entries to Gantt chart * Fix some autorefresh --- airflow/www/static/js/api/index.ts | 2 + airflow/www/static/js/api/useTaskFails.ts | 67 ++++++++++++++++ airflow/www/static/js/dag/details/gantt/Row.tsx | 41 ++++++++-- .../www/static/js/dag/details/gantt/TaskFail.tsx | 91 ++++++++++++++++++++++ .../static/js/dag/details/task/TaskDuration.tsx | 6 +- airflow/www/templates/airflow/dag.html | 1 + airflow/www/views.py | 25 ++++++ 7 files changed, 226 insertions(+), 7 deletions(-) diff --git a/airflow/www/static/js/api/index.ts b/airflow/www/static/js/api/index.ts index b04da5259a..4f883b4825 100644 --- a/airflow/www/static/js/api/index.ts +++ b/airflow/www/static/js/api/index.ts @@ -52,6 +52,7 @@ import useHistoricalMetricsData from "./useHistoricalMetricsData"; import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom"; import useEventLogs from "./useEventLogs"; import useCalendarData from "./useCalendarData"; +import useTaskFails from "./useTaskFails"; axios.interceptors.request.use((config) => { config.paramsSerializer = { @@ -100,4 +101,5 @@ export { useTaskXcomCollection, useEventLogs, useCalendarData, + useTaskFails, }; diff --git a/airflow/www/static/js/api/useTaskFails.ts b/airflow/www/static/js/api/useTaskFails.ts new file mode 100644 index 0000000000..f256db8f1f --- /dev/null +++ b/airflow/www/static/js/api/useTaskFails.ts @@ -0,0 +1,67 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { useQuery } from "react-query"; +import axios, { AxiosResponse } from "axios"; + +import { getMetaValue } from "src/utils"; +import { useAutoRefresh } from "src/context/autorefresh"; + +const DAG_ID_PARAM = "dag_id"; +const RUN_ID_PARAM = "run_id"; +const TASK_ID_PARAM = "task_id"; + +const dagId = getMetaValue(DAG_ID_PARAM); +const taskFailsUrl = getMetaValue("task_fails_url"); + +export interface TaskFail { + runId: string; + taskId: string; + mapIndex?: number; + startDate?: string; + endDate?: string; +} + +interface Props { + runId?: string; + taskId?: string; + enabled?: boolean; +} + +const useTaskFails = ({ runId, taskId, enabled = true }: Props) => { + const { isRefreshOn } = useAutoRefresh(); + + return useQuery( + ["taskFails", runId, taskId], + async () => { + const params = { + [DAG_ID_PARAM]: dagId, + [RUN_ID_PARAM]: runId, + [TASK_ID_PARAM]: taskId, + }; + return axios.get<AxiosResponse, TaskFail[]>(taskFailsUrl, { params }); + }, + { + enabled, + refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000, + } + ); +}; + +export default useTaskFails; diff --git a/airflow/www/static/js/dag/details/gantt/Row.tsx b/airflow/www/static/js/dag/details/gantt/Row.tsx index 1526e1713f..fbee17e139 100644 --- a/airflow/www/static/js/dag/details/gantt/Row.tsx +++ b/airflow/www/static/js/dag/details/gantt/Row.tsx @@ -19,13 +19,17 @@ import React from "react"; import { Box, Tooltip, Flex } from "@chakra-ui/react"; + import useSelection from "src/dag/useSelection"; import { getDuration } from "src/datetime_utils"; -import { SimpleStatus } from "src/dag/StatusBox"; +import { SimpleStatus, boxSize } from "src/dag/StatusBox"; import { useContainerRef } from "src/context/containerRef"; import { hoverDelay } from "src/utils"; import type { Task } from "src/types"; +import { useTaskFails } from "src/api"; + import GanttTooltip from "./GanttTooltip"; +import TaskFail from "./TaskFail"; interface Props { ganttWidth?: number; @@ -59,6 +63,12 @@ const Row = ({ : true); const isOpen = openGroupIds.includes(task.id || ""); + const { data: taskFails } = useTaskFails({ + taskId: task.id || undefined, + runId: runId || undefined, + enabled: !!(instance?.tryNumber && instance?.tryNumber > 1) && !!task.id, // Only try to look up task fails if it even has a try number > 1 + }); + // Calculate durations in ms const taskDuration = getDuration(instance?.startDate, instance?.endDate); const queuedDuration = hasValidQueuedDttm @@ -84,12 +94,14 @@ const Row = ({ return ( <div> <Box - py="4px" borderBottomWidth={1} borderBottomColor={!!task.children && isOpen ? "gray.400" : "gray.200"} bg={isSelected ? "blue.100" : "inherit"} + position="relative" + width={ganttWidth} + height={`${boxSize + 9}px`} > - {instance ? ( + {instance && ( <Tooltip label={<GanttTooltip task={task} instance={instance} />} hasArrow @@ -99,9 +111,11 @@ const Row = ({ > <Flex width={`${width + queuedWidth}px`} + position="absolute" cursor="pointer" pointerEvents="auto" - marginLeft={`${offsetMargin}px`} + top="4px" + left={`${offsetMargin}px`} onClick={() => { onSelect({ runId: instance.runId, @@ -129,9 +143,24 @@ const Row = ({ /> </Flex> </Tooltip> - ) : ( - <Box height="10px" /> )} + {/* Only show fails before the most recent task instance */} + {(taskFails || []) + .filter( + (tf) => + tf.startDate !== instance?.startDate && + // @ts-ignore + moment(tf.startDate).isAfter(ganttStartDate) + ) + .map((taskFail) => ( + <TaskFail + key={`${taskFail.taskId}-${taskFail.startDate}`} + taskFail={taskFail} + ganttStartDate={ganttStartDate} + ganttWidth={ganttWidth} + runDuration={runDuration} + /> + ))} </Box> {isOpen && !!task.children && diff --git a/airflow/www/static/js/dag/details/gantt/TaskFail.tsx b/airflow/www/static/js/dag/details/gantt/TaskFail.tsx new file mode 100644 index 0000000000..1d217d3bfb --- /dev/null +++ b/airflow/www/static/js/dag/details/gantt/TaskFail.tsx @@ -0,0 +1,91 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import React from "react"; +import { Box, Tooltip, Text } from "@chakra-ui/react"; + +import { getDuration } from "src/datetime_utils"; +import { SimpleStatus } from "src/dag/StatusBox"; +import { useContainerRef } from "src/context/containerRef"; +import { hoverDelay } from "src/utils"; +import Time from "src/components/Time"; + +import type { TaskFail as TaskFailType } from "src/api/useTaskFails"; + +interface Props { + taskFail: TaskFailType; + runDuration: number; + ganttWidth: number; + ganttStartDate?: string | null; +} + +const TaskFail = ({ + taskFail, + runDuration, + ganttWidth, + ganttStartDate, +}: Props) => { + const containerRef = useContainerRef(); + + const duration = getDuration(taskFail?.startDate, taskFail?.endDate); + const percent = duration / runDuration; + const failWidth = ganttWidth * percent; + + const startOffset = getDuration(ganttStartDate, taskFail?.startDate); + const offsetLeft = (startOffset / runDuration) * ganttWidth; + + return ( + <Tooltip + label={ + <Box> + <Text mb={2}>Task Fail</Text> + {taskFail?.startDate && ( + <Text> + Start: <Time dateTime={taskFail?.startDate} /> + </Text> + )} + {taskFail?.endDate && ( + <Text> + End: <Time dateTime={taskFail?.endDate} /> + </Text> + )} + <Text mt={2} fontSize="sm"> + Can only show previous Task Fails, other tries are not yet saved. + </Text> + </Box> + } + hasArrow + portalProps={{ containerRef }} + placement="top" + openDelay={hoverDelay} + top="4px" + > + <Box + position="absolute" + left={`${offsetLeft}px`} + cursor="pointer" + top="4px" + > + <SimpleStatus state="failed" width={`${failWidth}px`} /> + </Box> + </Tooltip> + ); +}; + +export default TaskFail; diff --git a/airflow/www/static/js/dag/details/task/TaskDuration.tsx b/airflow/www/static/js/dag/details/task/TaskDuration.tsx index 4b7eed9f3d..27cd8b33ec 100644 --- a/airflow/www/static/js/dag/details/task/TaskDuration.tsx +++ b/airflow/www/static/js/dag/details/task/TaskDuration.tsx @@ -22,7 +22,7 @@ import React from "react"; import useSelection from "src/dag/useSelection"; -import { useGridData } from "src/api"; +import { useGridData, useTaskFails } from "src/api"; import { startCase } from "lodash"; import { getDuration, formatDateTime, defaultFormat } from "src/datetime_utils"; import ReactECharts, { ReactEChartsProps } from "src/components/ReactECharts"; @@ -45,6 +45,10 @@ const TaskDuration = () => { onSelect, } = useSelection(); + const { data: taskFails } = useTaskFails({ taskId: taskId || undefined }); + + console.log(taskFails); + const { data: { dagRuns, groups, ordering }, } = useGridData(); diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index c69251e71c..6f5a187f61 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -57,6 +57,7 @@ <meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}"> <meta name="graph_data_url" content="{{ url_for('Airflow.graph_data') }}"> <meta name="calendar_data_url" content="{{ url_for('Airflow.calendar_data') }}"> + <meta name="task_fails_url" content="{{ url_for('Airflow.task_fails') }}"> <meta name="next_run_datasets_url" content="{{ url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}"> <meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id) }}"> <meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}"> diff --git a/airflow/www/views.py b/airflow/www/views.py index 5cffd28aaa..743947cdd3 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3505,6 +3505,31 @@ class Airflow(AirflowBaseView): {"Content-Type": "application/json; charset=utf-8"}, ) + @expose("/object/task_fails") + @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) + @provide_session + def task_fails(self, session): + """Return task fails.""" + dag_id = request.args.get("dag_id") + task_id = request.args.get("task_id") + run_id = request.args.get("run_id") + + query = select( + TaskFail.task_id, TaskFail.run_id, TaskFail.map_index, TaskFail.start_date, TaskFail.end_date + ).where(TaskFail.dag_id == dag_id) + + if run_id: + query = query.where(TaskFail.run_id == run_id) + if task_id: + query = query.where(TaskFail.task_id == task_id) + + task_fails = [dict(tf) for tf in session.execute(query).all()] + + return ( + htmlsafe_json_dumps(task_fails, separators=(",", ":"), dumps=flask.json.dumps), + {"Content-Type": "application/json; charset=utf-8"}, + ) + @expose("/object/task_instances") @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE) def task_instances(self):