This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 35b616333c Add TaskFail entries to Gantt chart (#37918)
35b616333c is described below

commit 35b616333ce7358ae4ba62a03a032debdea2a19a
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Thu Mar 7 17:09:28 2024 -0500

    Add TaskFail entries to Gantt chart (#37918)
    
    * Add TaskFail entries to Gantt chart
    
    * Fix some autorefresh
---
 airflow/www/static/js/api/index.ts                 |  2 +
 airflow/www/static/js/api/useTaskFails.ts          | 67 ++++++++++++++++
 airflow/www/static/js/dag/details/gantt/Row.tsx    | 41 ++++++++--
 .../www/static/js/dag/details/gantt/TaskFail.tsx   | 91 ++++++++++++++++++++++
 .../static/js/dag/details/task/TaskDuration.tsx    |  6 +-
 airflow/www/templates/airflow/dag.html             |  1 +
 airflow/www/views.py                               | 25 ++++++
 7 files changed, 226 insertions(+), 7 deletions(-)

diff --git a/airflow/www/static/js/api/index.ts 
b/airflow/www/static/js/api/index.ts
index b04da5259a..4f883b4825 100644
--- a/airflow/www/static/js/api/index.ts
+++ b/airflow/www/static/js/api/index.ts
@@ -52,6 +52,7 @@ import useHistoricalMetricsData from 
"./useHistoricalMetricsData";
 import { useTaskXcomEntry, useTaskXcomCollection } from "./useTaskXcom";
 import useEventLogs from "./useEventLogs";
 import useCalendarData from "./useCalendarData";
+import useTaskFails from "./useTaskFails";
 
 axios.interceptors.request.use((config) => {
   config.paramsSerializer = {
@@ -100,4 +101,5 @@ export {
   useTaskXcomCollection,
   useEventLogs,
   useCalendarData,
+  useTaskFails,
 };
diff --git a/airflow/www/static/js/api/useTaskFails.ts 
b/airflow/www/static/js/api/useTaskFails.ts
new file mode 100644
index 0000000000..f256db8f1f
--- /dev/null
+++ b/airflow/www/static/js/api/useTaskFails.ts
@@ -0,0 +1,67 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { useQuery } from "react-query";
+import axios, { AxiosResponse } from "axios";
+
+import { getMetaValue } from "src/utils";
+import { useAutoRefresh } from "src/context/autorefresh";
+
+const DAG_ID_PARAM = "dag_id";
+const RUN_ID_PARAM = "run_id";
+const TASK_ID_PARAM = "task_id";
+
+const dagId = getMetaValue(DAG_ID_PARAM);
+const taskFailsUrl = getMetaValue("task_fails_url");
+
+export interface TaskFail {
+  runId: string;
+  taskId: string;
+  mapIndex?: number;
+  startDate?: string;
+  endDate?: string;
+}
+
+interface Props {
+  runId?: string;
+  taskId?: string;
+  enabled?: boolean;
+}
+
+const useTaskFails = ({ runId, taskId, enabled = true }: Props) => {
+  const { isRefreshOn } = useAutoRefresh();
+
+  return useQuery(
+    ["taskFails", runId, taskId],
+    async () => {
+      const params = {
+        [DAG_ID_PARAM]: dagId,
+        [RUN_ID_PARAM]: runId,
+        [TASK_ID_PARAM]: taskId,
+      };
+      return axios.get<AxiosResponse, TaskFail[]>(taskFailsUrl, { params });
+    },
+    {
+      enabled,
+      refetchInterval: isRefreshOn && (autoRefreshInterval || 1) * 1000,
+    }
+  );
+};
+
+export default useTaskFails;
diff --git a/airflow/www/static/js/dag/details/gantt/Row.tsx 
b/airflow/www/static/js/dag/details/gantt/Row.tsx
index 1526e1713f..fbee17e139 100644
--- a/airflow/www/static/js/dag/details/gantt/Row.tsx
+++ b/airflow/www/static/js/dag/details/gantt/Row.tsx
@@ -19,13 +19,17 @@
 
 import React from "react";
 import { Box, Tooltip, Flex } from "@chakra-ui/react";
+
 import useSelection from "src/dag/useSelection";
 import { getDuration } from "src/datetime_utils";
-import { SimpleStatus } from "src/dag/StatusBox";
+import { SimpleStatus, boxSize } from "src/dag/StatusBox";
 import { useContainerRef } from "src/context/containerRef";
 import { hoverDelay } from "src/utils";
 import type { Task } from "src/types";
+import { useTaskFails } from "src/api";
+
 import GanttTooltip from "./GanttTooltip";
+import TaskFail from "./TaskFail";
 
 interface Props {
   ganttWidth?: number;
@@ -59,6 +63,12 @@ const Row = ({
       : true);
   const isOpen = openGroupIds.includes(task.id || "");
 
+  const { data: taskFails } = useTaskFails({
+    taskId: task.id || undefined,
+    runId: runId || undefined,
+    enabled: !!(instance?.tryNumber && instance?.tryNumber > 1) && !!task.id, 
// Only try to look up task fails if it even has a try number > 1
+  });
+
   // Calculate durations in ms
   const taskDuration = getDuration(instance?.startDate, instance?.endDate);
   const queuedDuration = hasValidQueuedDttm
@@ -84,12 +94,14 @@ const Row = ({
   return (
     <div>
       <Box
-        py="4px"
         borderBottomWidth={1}
         borderBottomColor={!!task.children && isOpen ? "gray.400" : "gray.200"}
         bg={isSelected ? "blue.100" : "inherit"}
+        position="relative"
+        width={ganttWidth}
+        height={`${boxSize + 9}px`}
       >
-        {instance ? (
+        {instance && (
           <Tooltip
             label={<GanttTooltip task={task} instance={instance} />}
             hasArrow
@@ -99,9 +111,11 @@ const Row = ({
           >
             <Flex
               width={`${width + queuedWidth}px`}
+              position="absolute"
               cursor="pointer"
               pointerEvents="auto"
-              marginLeft={`${offsetMargin}px`}
+              top="4px"
+              left={`${offsetMargin}px`}
               onClick={() => {
                 onSelect({
                   runId: instance.runId,
@@ -129,9 +143,24 @@ const Row = ({
               />
             </Flex>
           </Tooltip>
-        ) : (
-          <Box height="10px" />
         )}
+        {/* Only show fails before the most recent task instance */}
+        {(taskFails || [])
+          .filter(
+            (tf) =>
+              tf.startDate !== instance?.startDate &&
+              // @ts-ignore
+              moment(tf.startDate).isAfter(ganttStartDate)
+          )
+          .map((taskFail) => (
+            <TaskFail
+              key={`${taskFail.taskId}-${taskFail.startDate}`}
+              taskFail={taskFail}
+              ganttStartDate={ganttStartDate}
+              ganttWidth={ganttWidth}
+              runDuration={runDuration}
+            />
+          ))}
       </Box>
       {isOpen &&
         !!task.children &&
diff --git a/airflow/www/static/js/dag/details/gantt/TaskFail.tsx 
b/airflow/www/static/js/dag/details/gantt/TaskFail.tsx
new file mode 100644
index 0000000000..1d217d3bfb
--- /dev/null
+++ b/airflow/www/static/js/dag/details/gantt/TaskFail.tsx
@@ -0,0 +1,91 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import React from "react";
+import { Box, Tooltip, Text } from "@chakra-ui/react";
+
+import { getDuration } from "src/datetime_utils";
+import { SimpleStatus } from "src/dag/StatusBox";
+import { useContainerRef } from "src/context/containerRef";
+import { hoverDelay } from "src/utils";
+import Time from "src/components/Time";
+
+import type { TaskFail as TaskFailType } from "src/api/useTaskFails";
+
+interface Props {
+  taskFail: TaskFailType;
+  runDuration: number;
+  ganttWidth: number;
+  ganttStartDate?: string | null;
+}
+
+const TaskFail = ({
+  taskFail,
+  runDuration,
+  ganttWidth,
+  ganttStartDate,
+}: Props) => {
+  const containerRef = useContainerRef();
+
+  const duration = getDuration(taskFail?.startDate, taskFail?.endDate);
+  const percent = duration / runDuration;
+  const failWidth = ganttWidth * percent;
+
+  const startOffset = getDuration(ganttStartDate, taskFail?.startDate);
+  const offsetLeft = (startOffset / runDuration) * ganttWidth;
+
+  return (
+    <Tooltip
+      label={
+        <Box>
+          <Text mb={2}>Task Fail</Text>
+          {taskFail?.startDate && (
+            <Text>
+              Start: <Time dateTime={taskFail?.startDate} />
+            </Text>
+          )}
+          {taskFail?.endDate && (
+            <Text>
+              End: <Time dateTime={taskFail?.endDate} />
+            </Text>
+          )}
+          <Text mt={2} fontSize="sm">
+            Can only show previous Task Fails, other tries are not yet saved.
+          </Text>
+        </Box>
+      }
+      hasArrow
+      portalProps={{ containerRef }}
+      placement="top"
+      openDelay={hoverDelay}
+      top="4px"
+    >
+      <Box
+        position="absolute"
+        left={`${offsetLeft}px`}
+        cursor="pointer"
+        top="4px"
+      >
+        <SimpleStatus state="failed" width={`${failWidth}px`} />
+      </Box>
+    </Tooltip>
+  );
+};
+
+export default TaskFail;
diff --git a/airflow/www/static/js/dag/details/task/TaskDuration.tsx 
b/airflow/www/static/js/dag/details/task/TaskDuration.tsx
index 4b7eed9f3d..27cd8b33ec 100644
--- a/airflow/www/static/js/dag/details/task/TaskDuration.tsx
+++ b/airflow/www/static/js/dag/details/task/TaskDuration.tsx
@@ -22,7 +22,7 @@
 import React from "react";
 
 import useSelection from "src/dag/useSelection";
-import { useGridData } from "src/api";
+import { useGridData, useTaskFails } from "src/api";
 import { startCase } from "lodash";
 import { getDuration, formatDateTime, defaultFormat } from 
"src/datetime_utils";
 import ReactECharts, { ReactEChartsProps } from "src/components/ReactECharts";
@@ -45,6 +45,10 @@ const TaskDuration = () => {
     onSelect,
   } = useSelection();
 
+  const { data: taskFails } = useTaskFails({ taskId: taskId || undefined });
+
+  console.log(taskFails);
+
   const {
     data: { dagRuns, groups, ordering },
   } = useGridData();
diff --git a/airflow/www/templates/airflow/dag.html 
b/airflow/www/templates/airflow/dag.html
index c69251e71c..6f5a187f61 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -57,6 +57,7 @@
   <meta name="grid_data_url" content="{{ url_for('Airflow.grid_data') }}">
   <meta name="graph_data_url" content="{{ url_for('Airflow.graph_data') }}">
   <meta name="calendar_data_url" content="{{ url_for('Airflow.calendar_data') 
}}">
+  <meta name="task_fails_url" content="{{ url_for('Airflow.task_fails') }}">
   <meta name="next_run_datasets_url" content="{{ 
url_for('Airflow.next_run_datasets', dag_id=dag.dag_id) }}">
   <meta name="grid_url" content="{{ url_for('Airflow.grid', dag_id=dag.dag_id) 
}}">
   <meta name="datasets_url" content="{{ url_for('Airflow.datasets') }}">
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 5cffd28aaa..743947cdd3 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3505,6 +3505,31 @@ class Airflow(AirflowBaseView):
             {"Content-Type": "application/json; charset=utf-8"},
         )
 
+    @expose("/object/task_fails")
+    @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
+    @provide_session
+    def task_fails(self, session):
+        """Return task fails."""
+        dag_id = request.args.get("dag_id")
+        task_id = request.args.get("task_id")
+        run_id = request.args.get("run_id")
+
+        query = select(
+            TaskFail.task_id, TaskFail.run_id, TaskFail.map_index, 
TaskFail.start_date, TaskFail.end_date
+        ).where(TaskFail.dag_id == dag_id)
+
+        if run_id:
+            query = query.where(TaskFail.run_id == run_id)
+        if task_id:
+            query = query.where(TaskFail.task_id == task_id)
+
+        task_fails = [dict(tf) for tf in session.execute(query).all()]
+
+        return (
+            htmlsafe_json_dumps(task_fails, separators=(",", ":"), 
dumps=flask.json.dumps),
+            {"Content-Type": "application/json; charset=utf-8"},
+        )
+
     @expose("/object/task_instances")
     @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
     def task_instances(self):

Reply via email to