This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-4-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a6bc5d0eb12c70e2552be030fd3638ef4aab7016 Author: Tzu-ping Chung <[email protected]> AuthorDate: Mon Sep 26 20:59:08 2022 +0800 Use COALESCE when ordering runs to handle NULL (#26626) Data interval columns are NULL for runs created before 2.3, but SQL's NULL-sorting logic would make those old runs always appear first. In a perfect world we'd want to sort by get_run_data_interval(), but that's not efficient, so instead the columns are coalesced into logical date, which is good enough in most cases. (cherry picked from commit 22d52c00f6397fde8d97cf2479c0614671f5b5ba) --- airflow/www/utils.py | 44 ++++++++++++++++++++++++++++++++++++++------ airflow/www/views.py | 5 +---- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 3429d6a140..0aaaf2b26e 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -20,10 +20,9 @@ from __future__ import annotations import json import textwrap import time -from typing import Any +from typing import TYPE_CHECKING, Any, Sequence from urllib.parse import urlencode -import sqlalchemy as sqla from flask import request, url_for from flask.helpers import flash from flask_appbuilder.forms import FieldConverter @@ -37,11 +36,12 @@ from markupsafe import Markup from pendulum.datetime import DateTime from pygments import highlight, lexers from pygments.formatters import HtmlFormatter +from sqlalchemy import func, types from sqlalchemy.ext.associationproxy import AssociationProxy -from airflow import models from airflow.exceptions import RemovedInAirflow3Warning from airflow.models import errors +from airflow.models.dagrun import DagRun from airflow.models.dagwarning import DagWarning from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone @@ -51,6 +51,10 @@ from airflow.utils.state import State, TaskInstanceState from airflow.www.forms import DateTimeWithTimezoneField from airflow.www.widgets import AirflowDateTimePickerWidget +if TYPE_CHECKING: + from sqlalchemy.orm.query import Query + from sqlalchemy.sql.operators import ColumnOperators + def datetime_to_string(value: DateTime | None) -> str | None: if value is None: @@ -129,7 +133,7 @@ def get_mapped_summary(parent_instance, task_instances): } -def encode_dag_run(dag_run: models.DagRun | None) -> dict[str, Any] | None: +def encode_dag_run(dag_run: DagRun | None) -> dict[str, Any] | None: if not dag_run: return None @@ -436,6 +440,34 @@ def dag_run_link(attr): return Markup('<a href="{url}">{run_id}</a>').format(url=url, run_id=run_id) +def _get_run_ordering_expr(name: str) -> ColumnOperators: + expr = DagRun.__table__.columns[name] + # Data interval columns are NULL for runs created before 2.3, but SQL's + # NULL-sorting logic would make those old runs always appear first. In a + # perfect world we'd want to sort by ``get_run_data_interval()``, but that's + # not efficient, so instead the columns are coalesced into execution_date, + # which is good enough in most cases. + if name in ("data_interval_start", "data_interval_end"): + expr = func.coalesce(expr, DagRun.execution_date) + return expr.desc() + + +def sorted_dag_runs(query: Query, *, ordering: Sequence[str], limit: int) -> Sequence[DagRun]: + """Produce DAG runs sorted by specified columns. + + :param query: An ORM query object against *DagRun*. + :param ordering: Column names to sort the runs. should generally come from a + timetable's ``run_ordering``. + :param limit: Number of runs to limit to. + :return: A list of DagRun objects ordered by the specified columns. The list + contains only the *last* objects, but in *ascending* order. + """ + ordering_exprs = (_get_run_ordering_expr(name) for name in ordering) + runs = query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit).all() + runs.reverse() + return runs + + def format_map_index(attr: dict) -> str: """Format map index for list columns in model view.""" value = attr['map_index'] @@ -651,7 +683,7 @@ class CustomSQLAInterface(SQLAInterface): obj = self.list_columns[col_name].type return ( isinstance(obj, UtcDateTime) - or isinstance(obj, sqla.types.TypeDecorator) + or isinstance(obj, types.TypeDecorator) and isinstance(obj.impl, UtcDateTime) ) return False @@ -664,7 +696,7 @@ class CustomSQLAInterface(SQLAInterface): obj = self.list_columns[col_name].type return ( isinstance(obj, ExtendedJSON) - or isinstance(obj, sqla.types.TypeDecorator) + or isinstance(obj, types.TypeDecorator) and isinstance(obj.impl, ExtendedJSON) ) return False diff --git a/airflow/www/views.py b/airflow/www/views.py index cc85e23b11..b1d3c1209b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3454,10 +3454,7 @@ class Airflow(AirflowBaseView): if run_state: query = query.filter(DagRun.state == run_state) - ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering) - dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all() - dag_runs.reverse() - + dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs) encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs] data = { 'groups': dag_to_grid(dag, dag_runs, session),
