This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2560dd50c412fae79c5e062372209683359d03ee
Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com>
AuthorDate: Thu May 26 13:53:22 2022 -0600

    Fix and speed up grid view (#23947)
    
    This fetches all TIs for a given task across dag runs, leading to
    signifincatly faster response times. It also fixes a bug where Nones
    were being passed to the UI when a new task was added to a DAG with
    exiting runs.
    
    (cherry picked from commit 1cf483fa0c45e0110d99e37b4e45c72c6084aa97)
---
 airflow/www/utils.py | 57 ++++++++++++++++++++++++++--------------------------
 airflow/www/views.py |  2 +-
 2 files changed, 29 insertions(+), 30 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 2bdc2939bf..0a7de05abf 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -128,39 +128,38 @@ def get_mapped_summary(parent_instance, task_instances):
     }
 
 
-def get_task_summary(dag_run: DagRun, task, session: Session) -> 
Optional[Dict[str, Any]]:
-    task_instance = (
-        session.query(TaskInstance)
-        .filter(
-            TaskInstance.dag_id == task.dag_id,
-            TaskInstance.run_id == dag_run.run_id,
-            TaskInstance.task_id == task.task_id,
-            # Only get normal task instances or the first mapped task
-            TaskInstance.map_index <= 0,
-        )
-        .first()
+def get_task_summaries(task, dag_runs: List[DagRun], session: Session) -> 
List[Dict[str, Any]]:
+    tis = session.query(TaskInstance).filter(
+        TaskInstance.dag_id == task.dag_id,
+        TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
+        TaskInstance.task_id == task.task_id,
+        # Only get normal task instances or the first mapped task
+        TaskInstance.map_index <= 0,
     )
 
-    if not task_instance:
-        return None
+    def _get_summary(task_instance):
+        if task_instance.map_index > -1:
+            return get_mapped_summary(
+                task_instance, 
task_instances=get_mapped_instances(task_instance, session)
+            )
 
-    if task_instance.map_index > -1:
-        return get_mapped_summary(task_instance, 
task_instances=get_mapped_instances(task_instance, session))
+        try_count = (
+            task_instance.prev_attempted_tries
+            if task_instance.prev_attempted_tries != 0
+            else task_instance.try_number
+        )
 
-    try_count = (
-        task_instance.prev_attempted_tries
-        if task_instance.prev_attempted_tries != 0
-        else task_instance.try_number
-    )
-    return {
-        'task_id': task_instance.task_id,
-        'run_id': task_instance.run_id,
-        'map_index': task_instance.map_index,
-        'state': task_instance.state,
-        'start_date': datetime_to_string(task_instance.start_date),
-        'end_date': datetime_to_string(task_instance.end_date),
-        'try_number': try_count,
-    }
+        return {
+            'task_id': task_instance.task_id,
+            'run_id': task_instance.run_id,
+            'map_index': task_instance.map_index,
+            'state': task_instance.state,
+            'start_date': datetime_to_string(task_instance.start_date),
+            'end_date': datetime_to_string(task_instance.end_date),
+            'try_number': try_count,
+        }
+
+    return [_get_summary(ti) for ti in tis]
 
 
 def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, 
Any]]:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index a6030e0ab1..79927a3024 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -258,7 +258,7 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, 
session):
     if isinstance(task_item_or_group, AbstractOperator):
         return {
             'id': task_item_or_group.task_id,
-            'instances': [wwwutils.get_task_summary(dr, task_item_or_group, 
session) for dr in dag_runs],
+            'instances': wwwutils.get_task_summaries(task_item_or_group, 
dag_runs, session),
             'label': task_item_or_group.label,
             'extra_links': task_item_or_group.extra_links,
             'is_mapped': task_item_or_group.is_mapped,

Reply via email to