This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2560dd50c412fae79c5e062372209683359d03ee Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> AuthorDate: Thu May 26 13:53:22 2022 -0600 Fix and speed up grid view (#23947) This fetches all TIs for a given task across dag runs, leading to signifincatly faster response times. It also fixes a bug where Nones were being passed to the UI when a new task was added to a DAG with exiting runs. (cherry picked from commit 1cf483fa0c45e0110d99e37b4e45c72c6084aa97) --- airflow/www/utils.py | 57 ++++++++++++++++++++++++++-------------------------- airflow/www/views.py | 2 +- 2 files changed, 29 insertions(+), 30 deletions(-) diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 2bdc2939bf..0a7de05abf 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -128,39 +128,38 @@ def get_mapped_summary(parent_instance, task_instances): } -def get_task_summary(dag_run: DagRun, task, session: Session) -> Optional[Dict[str, Any]]: - task_instance = ( - session.query(TaskInstance) - .filter( - TaskInstance.dag_id == task.dag_id, - TaskInstance.run_id == dag_run.run_id, - TaskInstance.task_id == task.task_id, - # Only get normal task instances or the first mapped task - TaskInstance.map_index <= 0, - ) - .first() +def get_task_summaries(task, dag_runs: List[DagRun], session: Session) -> List[Dict[str, Any]]: + tis = session.query(TaskInstance).filter( + TaskInstance.dag_id == task.dag_id, + TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]), + TaskInstance.task_id == task.task_id, + # Only get normal task instances or the first mapped task + TaskInstance.map_index <= 0, ) - if not task_instance: - return None + def _get_summary(task_instance): + if task_instance.map_index > -1: + return get_mapped_summary( + task_instance, task_instances=get_mapped_instances(task_instance, session) + ) - if task_instance.map_index > -1: - return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session)) + try_count = ( + task_instance.prev_attempted_tries + if task_instance.prev_attempted_tries != 0 + else task_instance.try_number + ) - try_count = ( - task_instance.prev_attempted_tries - if task_instance.prev_attempted_tries != 0 - else task_instance.try_number - ) - return { - 'task_id': task_instance.task_id, - 'run_id': task_instance.run_id, - 'map_index': task_instance.map_index, - 'state': task_instance.state, - 'start_date': datetime_to_string(task_instance.start_date), - 'end_date': datetime_to_string(task_instance.end_date), - 'try_number': try_count, - } + return { + 'task_id': task_instance.task_id, + 'run_id': task_instance.run_id, + 'map_index': task_instance.map_index, + 'state': task_instance.state, + 'start_date': datetime_to_string(task_instance.start_date), + 'end_date': datetime_to_string(task_instance.end_date), + 'try_number': try_count, + } + + return [_get_summary(ti) for ti in tis] def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]: diff --git a/airflow/www/views.py b/airflow/www/views.py index a6030e0ab1..79927a3024 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -258,7 +258,7 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, session): if isinstance(task_item_or_group, AbstractOperator): return { 'id': task_item_or_group.task_id, - 'instances': [wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs], + 'instances': wwwutils.get_task_summaries(task_item_or_group, dag_runs, session), 'label': task_item_or_group.label, 'extra_links': task_item_or_group.extra_links, 'is_mapped': task_item_or_group.is_mapped,