This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch show-mapped-task-in-tree-view
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c646171b7bcbfde1e45dfdc88087b0e9113df2f2
Author: Brent Bovenzi <[email protected]>
AuthorDate: Tue Feb 22 12:17:29 2022 -0500

    add graph tooltip and map count
---
 airflow/models/dagrun.py                |  48 ++++++------
 airflow/www/static/js/graph.js          |  42 ++++++++--
 airflow/www/static/js/task_instances.js |  32 ++++++++
 airflow/www/utils.py                    | 135 +++++++++++++++++---------------
 airflow/www/views.py                    |  12 ++-
 5 files changed, 176 insertions(+), 93 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 69b003f..5170ad3 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -43,6 +43,7 @@ from sqlalchemy.orm.session import Session
 from sqlalchemy.sql.expression import false, select, true
 
 from airflow import settings
+from airflow.callbacks.callback_requests import DagCallbackRequest
 from airflow.configuration import conf as airflow_conf
 from airflow.exceptions import AirflowException, TaskNotFound
 from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
@@ -52,7 +53,7 @@ from airflow.models.tasklog import LogTemplate
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
-from airflow.utils import callback_requests, timezone
+from airflow.utils import timezone
 from airflow.utils.helpers import is_container
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -487,7 +488,7 @@ class DagRun(Base, LoggingMixin):
     @provide_session
     def update_state(
         self, session: Session = NEW_SESSION, execute_callbacks: bool = True
-    ) -> Tuple[List[TI], Optional[callback_requests.DagCallbackRequest]]:
+    ) -> Tuple[List[TI], Optional[DagCallbackRequest]]:
         """
         Determines the overall state of the DagRun based on the state
         of its TaskInstances.
@@ -499,7 +500,7 @@ class DagRun(Base, LoggingMixin):
             needs to be executed
         """
         # Callback to execute in case of Task Failures
-        callback: Optional[callback_requests.DagCallbackRequest] = None
+        callback: Optional[DagCallbackRequest] = None
 
         start_dttm = timezone.utcnow()
         self.last_scheduling_decision = start_dttm
@@ -535,7 +536,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=False, 
reason='task_failure', session=session)
             elif dag.has_on_failure_callback:
-                callback = callback_requests.DagCallbackRequest(
+                callback = DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -550,7 +551,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=True, reason='success', 
session=session)
             elif dag.has_on_success_callback:
-                callback = callback_requests.DagCallbackRequest(
+                callback = DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -571,7 +572,7 @@ class DagRun(Base, LoggingMixin):
             if execute_callbacks:
                 dag.handle_callback(self, success=False, 
reason='all_tasks_deadlocked', session=session)
             elif dag.has_on_failure_callback:
-                callback = callback_requests.DagCallbackRequest(
+                callback = DagCallbackRequest(
                     full_filepath=dag.fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
@@ -651,7 +652,7 @@ class DagRun(Base, LoggingMixin):
 
     def _get_ready_tis(
         self,
-        scheduleable_tis: List[TI],
+        schedulable_tis: List[TI],
         finished_tis: List[TI],
         session: Session,
     ) -> Tuple[List[TI], bool]:
@@ -659,41 +660,40 @@ class DagRun(Base, LoggingMixin):
         ready_tis: List[TI] = []
         changed_tis = False
 
-        if not scheduleable_tis:
+        if not schedulable_tis:
             return ready_tis, changed_tis
 
         # If we expand TIs, we need a new list so that we iterate over them 
too. (We can't alter
-        # `scheduleable_tis` in place and have the `for` loop pick them up
+        # `schedulable_tis` in place and have the `for` loop pick them up
         expanded_tis: List[TI] = []
 
         # Check dependencies
-        for st in itertools.chain(scheduleable_tis, expanded_tis):
+        for schedulable in itertools.chain(schedulable_tis, expanded_tis):
 
             # Expansion of last resort! This is ideally handled in the 
mini-scheduler in LocalTaskJob, but if
             # for any reason it wasn't, we need to expand it now
-            if st.map_index < 0 and st.task.is_mapped:
+            if schedulable.map_index < 0 and schedulable.task.is_mapped:
                 # HACK. This needs a better way, one that copes with multiple 
upstreams!
                 for ti in finished_tis:
-                    if st.task_id in ti.task.downstream_task_ids:
-                        upstream = ti
-
-                        assert isinstance(st.task, MappedOperator)
-                        new_tis = st.task.expand_mapped_task(upstream, 
session=session)
-                        assert new_tis[0] is st
-                        # Add the new TIs to the list to be checked
-                        for new_ti in new_tis[1:]:
-                            new_ti.task = st.task
+                    if schedulable.task_id in ti.task.downstream_task_ids:
+
+                        assert isinstance(schedulable.task, MappedOperator)
+                        new_tis = 
schedulable.task.expand_mapped_task(self.run_id, session=session)
+                        if schedulable.state == TaskInstanceState.SKIPPED:
+                            # Task is now skipped (likely cos upstream 
returned 0 tasks
+                            continue
+                        assert new_tis[0] is schedulable
                         expanded_tis.extend(new_tis[1:])
                         break
 
-            old_state = st.state
-            if st.are_dependencies_met(
+            old_state = schedulable.state
+            if schedulable.are_dependencies_met(
                 dep_context=DepContext(flag_upstream_failed=True, 
finished_tis=finished_tis),
                 session=session,
             ):
-                ready_tis.append(st)
+                ready_tis.append(schedulable)
             else:
-                old_states[st.key] = old_state
+                old_states[schedulable.key] = old_state
 
         # Check if any ti changed state
         tis_filter = TI.filter_for_tis(old_states.keys())
diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 254db5a..258004a 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -82,6 +82,33 @@ const render = dagreD3.render();
 const svg = d3.select('#graph-svg');
 let innerSvg = d3.select('#graph-svg g');
 
+const updateNodes = (node, instances) => {
+  const value = {
+    ...node.value,
+    label: tasks[node.id] && tasks[node.id].is_mapped
+      ? `${node.value.label} [${(instances[node.id].mapped_states && 
instances[node.id].mapped_states.length) || ' '}]`
+      : node.value.label,
+  };
+
+  if (g.node(node.id)) {
+    g.node(node.id).label = value.label;
+  }
+
+  if (node.children) {
+    return {
+      ...node,
+      value,
+      children: node.children.map((n) => updateNodes(n, instances)),
+    };
+  }
+  return {
+    ...node,
+    value,
+  };
+};
+
+let updatedNodes = updateNodes(nodes, taskInstances);
+
 // Remove the node with this nodeId from g.
 function removeNode(nodeId) {
   if (g.hasNode(nodeId)) {
@@ -377,6 +404,9 @@ function handleRefresh() {
           if (isFinal) {
             $('#auto_refresh').prop('checked', false);
             clearInterval(refreshInterval);
+            if (JSON.stringify(nodes) !== JSON.stringify(updateNodes)) {
+              draw();
+            }
           }
         }
         prevTis = tis;
@@ -476,15 +506,17 @@ function groupTooltip(node, tis) {
 // Assigning css classes based on state to nodes
 // Initiating the tooltips
 function updateNodesStates(tis) {
+  updatedNodes = updateNodes(nodes, tis);
   g.nodes().forEach((nodeId) => {
-    const { elem } = g.node(nodeId);
+    const node = g.node(nodeId);
+    const { elem } = node;
+    const taskId = nodeId;
+
     if (elem) {
       const classes = `node enter ${getNodeState(nodeId, tis)}`;
       elem.setAttribute('class', classes);
       elem.setAttribute('data-toggle', 'tooltip');
 
-      const taskId = nodeId;
-      const node = g.node(nodeId);
       elem.onmouseover = (evt) => {
         let tt;
         if (taskId in tis) {
@@ -713,10 +745,10 @@ const focusNodeId = 
localStorage.getItem(focusedGroupKey(dagId));
 const expandedGroups = getSavedGroups(dagId);
 
 // Always expand the root node
-expandGroup(null, nodes);
+expandGroup(null, updatedNodes);
 
 // Expand the node that were previously expanded
-expandSavedGroups(expandedGroups, nodes);
+expandSavedGroups(expandedGroups, updatedNodes);
 
 // Draw once after all groups have been expanded
 draw();
diff --git a/airflow/www/static/js/task_instances.js 
b/airflow/www/static/js/task_instances.js
index 13f3a9d..e123bd5 100644
--- a/airflow/www/static/js/task_instances.js
+++ b/airflow/www/static/js/task_instances.js
@@ -69,6 +69,34 @@ export default function tiTooltip(ti, { includeTryNumber = 
false } = {}) {
   if (ti.state !== undefined) {
     tt += `<strong>Status:</strong> ${escapeHtml(ti.state)}<br><br>`;
   }
+  if (ti.mapped_states) {
+    const STATES = [
+      ['success', 0],
+      ['failed', 0],
+      ['upstream_failed', 0],
+      ['up_for_retry', 0],
+      ['up_for_reschedule', 0],
+      ['running', 0],
+      ['deferred', 0],
+      ['sensing', 0],
+      ['queued', 0],
+      ['scheduled', 0],
+      ['skipped', 0],
+      ['no_status', 0],
+    ];
+    const numMap = new Map(STATES);
+    ti.mapped_states.forEach((s) => {
+      const stateKey = s || 'no_status';
+      if (numMap.has(stateKey)) numMap.set(stateKey, numMap.get(stateKey) + 1);
+    });
+    tt += `<strong>${escapeHtml(ti.mapped_states.length)} Tasks 
Mapped</strong><br />`;
+    numMap.forEach((key, val) => {
+      if (key > 0) {
+        tt += `<span style="margin-left: 15px">${escapeHtml(val)}: 
${escapeHtml(key)}</span><br />`;
+      }
+    });
+    tt += '<br />';
+  }
   if (ti.task_id !== undefined) {
     tt += `Task_id: ${escapeHtml(ti.task_id)}<br>`;
   }
@@ -76,6 +104,10 @@ export default function tiTooltip(ti, { includeTryNumber = 
false } = {}) {
   if (ti.run_id !== undefined) {
     tt += `Run Id: <nobr>${escapeHtml(ti.run_id)}</nobr><br>`;
   }
+  // Show mapped index for specific child instance, but not for a summary 
instance
+  if (ti.map_index >= 0 && !ti.mapped_states) {
+    tt += `Map Index: ${escapeHtml(ti.map_index)}<br>`;
+  }
   if (ti.operator !== undefined) {
     tt += `Operator: ${escapeHtml(ti.operator)}<br>`;
   }
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index ed765d4..ef08f8e 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -43,6 +43,7 @@ from airflow.models import errors
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.code_utils import get_python_source
+from airflow.utils.helpers import alchemy_to_dict
 from airflow.utils.json import AirflowJsonEncoder
 from airflow.utils.state import State
 from airflow.www.forms import DateTimeWithTimezoneField
@@ -55,13 +56,83 @@ def datetime_to_string(value: Optional[DateTime]) -> 
Optional[str]:
     return value.isoformat()
 
 
+def get_mapped_instances(task_instance, session):
+    return (
+        session.query(TaskInstance)
+        .filter(
+            TaskInstance.dag_id == task_instance.dag_id,
+            TaskInstance.run_id == task_instance.run_id,
+            TaskInstance.task_id == task_instance.task_id,
+            TaskInstance.map_index >= 0,
+        )
+        .all()
+    )
+
+
+def get_instance_with_map(task_instance, session):
+    if task_instance.map_index == -1:
+        return alchemy_to_dict(task_instance)
+    mapped_instances = get_mapped_instances(task_instance, session)
+    return get_mapped_summary(task_instance, mapped_instances)
+
+
+def get_mapped_summary(parent_instance, task_instances):
+    priority = [
+        'failed',
+        'upstream_failed',
+        'up_for_retry',
+        'up_for_reschedule',
+        'queued',
+        'scheduled',
+        'deferred',
+        'sensing',
+        'running',
+        'shutdown',
+        'restarting',
+        'removed',
+        'no_status',
+        'success',
+        'skipped',
+    ]
+
+    mapped_states = [ti.state for ti in task_instances]
+
+    group_state = None
+    for state in priority:
+        if state in mapped_states:
+            group_state = state
+            break
+
+    group_start_date = datetime_to_string(
+        min((ti.start_date for ti in task_instances if ti.start_date), 
default=None)
+    )
+    group_end_date = datetime_to_string(
+        max((ti.end_date for ti in task_instances if ti.end_date), 
default=None)
+    )
+
+    return {
+        'task_id': parent_instance.task_id,
+        'run_id': parent_instance.run_id,
+        'state': group_state,
+        'start_date': group_start_date,
+        'end_date': group_end_date,
+        'mapped_states': mapped_states,
+        'operator': parent_instance.operator,
+        'execution_date': datetime_to_string(parent_instance.execution_date),
+        'try_number': parent_instance.try_number,
+    }
+
+
 def encode_ti(
-    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: 
Session
+    task_instance: Optional[TaskInstance], is_mapped: Optional[bool], session: 
Optional[Session]
 ) -> Optional[Dict[str, Any]]:
     if not task_instance:
         return None
 
-    summary = {
+    if is_mapped:
+        return get_mapped_summary(task_instance, 
task_instances=get_mapped_instances(task_instance, session))
+
+    return {
         'task_id': task_instance.task_id,
         'dag_id': task_instance.dag_id,
         'run_id': task_instance.run_id,
@@ -74,66 +145,6 @@ def encode_ti(
         'try_number': task_instance.try_number,
     }
 
-    def get_mapped_summary(task_instances):
-        priority = [
-            'failed',
-            'upstream_failed',
-            'up_for_retry',
-            'up_for_reschedule',
-            'queued',
-            'scheduled',
-            'deferred',
-            'sensing',
-            'running',
-            'shutdown',
-            'restarting',
-            'removed',
-            'no_status',
-            'success',
-            'skipped',
-        ]
-
-        mapped_states = [ti.state for ti in task_instances]
-
-        group_state = None
-        for state in priority:
-            if state in mapped_states:
-                group_state = state
-                break
-
-        group_start_date = datetime_to_string(
-            min((ti.start_date for ti in task_instances if ti.start_date), 
default=None)
-        )
-        group_end_date = datetime_to_string(
-            max((ti.end_date for ti in task_instances if ti.end_date), 
default=None)
-        )
-
-        return {
-            'task_id': task_instance.task_id,
-            'run_id': task_instance.run_id,
-            'state': group_state,
-            'start_date': group_start_date,
-            'end_date': group_end_date,
-            'mapped_states': mapped_states,
-            'operator': task_instance.operator,
-            'execution_date': datetime_to_string(task_instance.execution_date),
-            'try_number': task_instance.try_number,
-        }
-
-    if is_mapped:
-        return get_mapped_summary(
-            session.query(TaskInstance)
-            .filter(
-                TaskInstance.dag_id == task_instance.dag_id,
-                TaskInstance.run_id == task_instance.run_id,
-                TaskInstance.task_id == task_instance.task_id,
-                TaskInstance.map_index >= 0,
-            )
-            .all()
-        )
-
-    return summary
-
 
 def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, 
Any]]:
     if not dag_run:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 63c9fb1..a61727f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2648,12 +2648,16 @@ class Airflow(AirflowBaseView):
         form = GraphForm(data=dt_nr_dr_data)
         form.execution_date.choices = dt_nr_dr_data['dr_choices']
 
-        task_instances = {ti.task_id: alchemy_to_dict(ti) for ti in 
dag.get_task_instances(dttm, dttm)}
+        task_instances = {
+            ti.task_id: wwwutils.get_instance_with_map(ti, session)
+            for ti in dag.get_task_instances(dttm, dttm)
+        }
         tasks = {
             t.task_id: {
                 'dag_id': t.dag_id,
                 'task_type': t.task_type,
                 'extra_links': t.extra_links,
+                'is_mapped': t.is_mapped,
             }
             for t in dag.tasks
         }
@@ -3244,7 +3248,11 @@ class Airflow(AirflowBaseView):
         else:
             return "Error: Invalid execution_date"
 
-        task_instances = {ti.task_id: alchemy_to_dict(ti) for ti in 
dag.get_task_instances(dttm, dttm)}
+        with create_session() as session:
+            task_instances = {
+                ti.task_id: wwwutils.get_instance_with_map(ti, session)
+                for ti in dag.get_task_instances(dttm, dttm)
+            }
 
         return json.dumps(task_instances, cls=utils_json.AirflowJsonEncoder)
 

Reply via email to