This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b369085409ecf8d6c9d72046fcf92a6b495735f7 Author: Chris Redekop <32752154+repl-ch...@users.noreply.github.com> AuthorDate: Mon May 9 09:49:53 2022 -0600 Fix broken dagrun links when many runs start at the same time (#23462) * Load requested dagrun even when there are many dagruns at (almost) the same time * Fix code formatting issues (cherry picked from commit 828016747ac06f6fb2564c07bb8be92246f42567) --- airflow/www/views.py | 12 +++- tests/www/views/test_views_graph_gantt.py | 96 ++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index fe4ae3ceb1..1ab49313fd 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -201,9 +201,19 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') num_runs = www_request.args.get('num_runs', default=default_dag_run, type=int) + # When base_date has been rounded up because of the DateTimeField widget, we want + # to use the execution_date as the starting point for our query just to ensure a + # link targeting a specific dag run actually loads that dag run. If there are + # more than num_runs dag runs in the "rounded period" then those dagruns would get + # loaded and the actual requested run would be excluded by the limit(). Once + # the user has changed base date to be anything else we want to use that instead. + query_date = base_date + if date_time < base_date and date_time + timedelta(seconds=1) >= base_date: + query_date = date_time + drs = ( session.query(DagRun) - .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date) + .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= query_date) .order_by(desc(DagRun.execution_date)) .limit(num_runs) .all() diff --git a/tests/www/views/test_views_graph_gantt.py b/tests/www/views/test_views_graph_gantt.py index 7aedb28603..0d549d9bbd 100644 --- a/tests/www/views/test_views_graph_gantt.py +++ b/tests/www/views/test_views_graph_gantt.py @@ -15,11 +15,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from datetime import timedelta +from urllib.parse import quote + import pytest from airflow.configuration import conf from airflow.models import DAG from airflow.utils import timezone +from airflow.utils.session import provide_session from airflow.utils.state import State DAG_ID = "dag_for_testing_dt_nr_dr_form" @@ -30,6 +34,7 @@ RUNS_DATA = [ ("dag_run_for_testing_dt_nr_dr_form_2", timezone.datetime(2018, 2, 2)), ("dag_run_for_testing_dt_nr_dr_form_1", timezone.datetime(2018, 1, 1)), ] +VERY_CLOSE_RUNS_DATE = timezone.datetime(2020, 1, 1, 0, 0, 0) ENDPOINTS = [ "/graph?dag_id=dag_for_testing_dt_nr_dr_form", @@ -44,9 +49,10 @@ def dag(app): return dag +@provide_session @pytest.fixture(scope="module") -def runs(dag): - return [ +def runs(dag, session): + dag_runs = [ dag.create_dagrun( run_id=run_id, execution_date=execution_date, @@ -56,6 +62,9 @@ def runs(dag): ) for run_id, execution_date, in RUNS_DATA ] + yield dag_runs + for dag_run in dag_runs: + session.delete(dag_run) def _assert_run_is_in_dropdown_not_selected(run, data): @@ -68,6 +77,10 @@ def _assert_run_is_selected(run, data): assert f'<option selected value="{exec_date}">{run.run_id}</option>' in data +def _assert_base_date(base_date, data): + assert f'name="base_date" required type="text" value="{base_date.isoformat()}"' in data + + def _assert_base_date_and_num_runs(base_date, num, data): assert f'name="base_date" value="{base_date}"' not in data assert f'<option selected="" value="{num}">{num}</option>' not in data @@ -205,3 +218,82 @@ def test_with_base_date_and_num_runs_and_execution_date_within(admin_client, run _assert_run_is_not_in_dropdown(runs[1], data) _assert_run_is_in_dropdown_not_selected(runs[2], data) _assert_run_is_selected(runs[3], data) + + +@provide_session +@pytest.fixture +def very_close_dagruns(dag, session): + dag_runs = [] + for idx, (run_id, _) in enumerate(RUNS_DATA): + execution_date = VERY_CLOSE_RUNS_DATE.replace(microsecond=idx) + dag_runs.append( + dag.create_dagrun( + run_id=run_id + '_close', + execution_date=execution_date, + data_interval=(execution_date, execution_date), + state=State.SUCCESS, + external_trigger=True, + ) + ) + yield dag_runs + for dag_run in dag_runs: + session.delete(dag_run) + session.commit() + + +@pytest.mark.parametrize("endpoint", ENDPOINTS) +def test_rounds_base_date_but_queries_with_execution_date(admin_client, very_close_dagruns, endpoint): + exec_date = quote(very_close_dagruns[1].execution_date.isoformat()) + response = admin_client.get( + f'{endpoint}&num_runs=2&execution_date={exec_date}', + data={"username": "test", "password": "test"}, + follow_redirects=True, + ) + assert response.status_code == 200 + + data = response.data.decode() + _assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=1), data) + _assert_run_is_in_dropdown_not_selected(very_close_dagruns[0], data) + _assert_run_is_selected(very_close_dagruns[1], data) + _assert_run_is_not_in_dropdown(very_close_dagruns[2], data) + _assert_run_is_not_in_dropdown(very_close_dagruns[3], data) + + +@pytest.mark.parametrize("endpoint", ENDPOINTS) +def test_uses_execution_date_on_filter_application_if_base_date_hasnt_changed( + admin_client, very_close_dagruns, endpoint +): + base_date = quote((VERY_CLOSE_RUNS_DATE + timedelta(seconds=1)).isoformat()) + exec_date = quote(very_close_dagruns[1].execution_date.isoformat()) + response = admin_client.get( + f'{endpoint}&base_date={base_date}&num_runs=2&execution_date={exec_date}', + data={"username": "test", "password": "test"}, + follow_redirects=True, + ) + assert response.status_code == 200 + + data = response.data.decode() + _assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=1), data) + _assert_run_is_in_dropdown_not_selected(very_close_dagruns[0], data) + _assert_run_is_selected(very_close_dagruns[1], data) + _assert_run_is_not_in_dropdown(very_close_dagruns[2], data) + _assert_run_is_not_in_dropdown(very_close_dagruns[3], data) + + +@pytest.mark.parametrize("endpoint", ENDPOINTS) +def test_uses_base_date_if_changed_away_from_execution_date(admin_client, very_close_dagruns, endpoint): + base_date = quote((VERY_CLOSE_RUNS_DATE + timedelta(seconds=2)).isoformat()) + exec_date = quote(very_close_dagruns[1].execution_date.isoformat()) + response = admin_client.get( + f'{endpoint}&base_date={base_date}&num_runs=2&execution_date={exec_date}', + data={"username": "test", "password": "test"}, + follow_redirects=True, + ) + assert response.status_code == 200 + + data = response.data.decode() + _assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=2), data) + _assert_run_is_not_in_dropdown(very_close_dagruns[0], data) + _assert_run_is_not_in_dropdown(very_close_dagruns[1], data) + _assert_run_is_in_dropdown_not_selected(very_close_dagruns[2], data) + _assert_run_is_selected(very_close_dagruns[3], data)