This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b369085409ecf8d6c9d72046fcf92a6b495735f7
Author: Chris Redekop <32752154+repl-ch...@users.noreply.github.com>
AuthorDate: Mon May 9 09:49:53 2022 -0600

    Fix broken dagrun links when many runs start at the same time (#23462)
    
    * Load requested dagrun even when there are many dagruns at (almost) the 
same time
    
    * Fix code formatting issues
    
    (cherry picked from commit 828016747ac06f6fb2564c07bb8be92246f42567)
---
 airflow/www/views.py                      | 12 +++-
 tests/www/views/test_views_graph_gantt.py | 96 ++++++++++++++++++++++++++++++-
 2 files changed, 105 insertions(+), 3 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index fe4ae3ceb1..1ab49313fd 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -201,9 +201,19 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, 
session, dag):
     default_dag_run = conf.getint('webserver', 
'default_dag_run_display_number')
     num_runs = www_request.args.get('num_runs', default=default_dag_run, 
type=int)
 
+    # When base_date has been rounded up because of the DateTimeField widget, 
we want
+    # to use the execution_date as the starting point for our query just to 
ensure a
+    # link targeting a specific dag run actually loads that dag run.  If there 
are
+    # more than num_runs dag runs in the "rounded period" then those dagruns 
would get
+    # loaded and the actual requested run would be excluded by the limit().  
Once
+    # the user has changed base date to be anything else we want to use that 
instead.
+    query_date = base_date
+    if date_time < base_date and date_time + timedelta(seconds=1) >= base_date:
+        query_date = date_time
+
     drs = (
         session.query(DagRun)
-        .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= 
base_date)
+        .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= 
query_date)
         .order_by(desc(DagRun.execution_date))
         .limit(num_runs)
         .all()
diff --git a/tests/www/views/test_views_graph_gantt.py 
b/tests/www/views/test_views_graph_gantt.py
index 7aedb28603..0d549d9bbd 100644
--- a/tests/www/views/test_views_graph_gantt.py
+++ b/tests/www/views/test_views_graph_gantt.py
@@ -15,11 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from datetime import timedelta
+from urllib.parse import quote
+
 import pytest
 
 from airflow.configuration import conf
 from airflow.models import DAG
 from airflow.utils import timezone
+from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 DAG_ID = "dag_for_testing_dt_nr_dr_form"
@@ -30,6 +34,7 @@ RUNS_DATA = [
     ("dag_run_for_testing_dt_nr_dr_form_2", timezone.datetime(2018, 2, 2)),
     ("dag_run_for_testing_dt_nr_dr_form_1", timezone.datetime(2018, 1, 1)),
 ]
+VERY_CLOSE_RUNS_DATE = timezone.datetime(2020, 1, 1, 0, 0, 0)
 
 ENDPOINTS = [
     "/graph?dag_id=dag_for_testing_dt_nr_dr_form",
@@ -44,9 +49,10 @@ def dag(app):
     return dag
 
 
+@provide_session
 @pytest.fixture(scope="module")
-def runs(dag):
-    return [
+def runs(dag, session):
+    dag_runs = [
         dag.create_dagrun(
             run_id=run_id,
             execution_date=execution_date,
@@ -56,6 +62,9 @@ def runs(dag):
         )
         for run_id, execution_date, in RUNS_DATA
     ]
+    yield dag_runs
+    for dag_run in dag_runs:
+        session.delete(dag_run)
 
 
 def _assert_run_is_in_dropdown_not_selected(run, data):
@@ -68,6 +77,10 @@ def _assert_run_is_selected(run, data):
     assert f'<option selected value="{exec_date}">{run.run_id}</option>' in 
data
 
 
+def _assert_base_date(base_date, data):
+    assert f'name="base_date" required type="text" 
value="{base_date.isoformat()}"' in data
+
+
 def _assert_base_date_and_num_runs(base_date, num, data):
     assert f'name="base_date" value="{base_date}"' not in data
     assert f'<option selected="" value="{num}">{num}</option>' not in data
@@ -205,3 +218,82 @@ def 
test_with_base_date_and_num_runs_and_execution_date_within(admin_client, run
     _assert_run_is_not_in_dropdown(runs[1], data)
     _assert_run_is_in_dropdown_not_selected(runs[2], data)
     _assert_run_is_selected(runs[3], data)
+
+
+@provide_session
+@pytest.fixture
+def very_close_dagruns(dag, session):
+    dag_runs = []
+    for idx, (run_id, _) in enumerate(RUNS_DATA):
+        execution_date = VERY_CLOSE_RUNS_DATE.replace(microsecond=idx)
+        dag_runs.append(
+            dag.create_dagrun(
+                run_id=run_id + '_close',
+                execution_date=execution_date,
+                data_interval=(execution_date, execution_date),
+                state=State.SUCCESS,
+                external_trigger=True,
+            )
+        )
+    yield dag_runs
+    for dag_run in dag_runs:
+        session.delete(dag_run)
+    session.commit()
+
+
+@pytest.mark.parametrize("endpoint", ENDPOINTS)
+def test_rounds_base_date_but_queries_with_execution_date(admin_client, 
very_close_dagruns, endpoint):
+    exec_date = quote(very_close_dagruns[1].execution_date.isoformat())
+    response = admin_client.get(
+        f'{endpoint}&num_runs=2&execution_date={exec_date}',
+        data={"username": "test", "password": "test"},
+        follow_redirects=True,
+    )
+    assert response.status_code == 200
+
+    data = response.data.decode()
+    _assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=1), data)
+    _assert_run_is_in_dropdown_not_selected(very_close_dagruns[0], data)
+    _assert_run_is_selected(very_close_dagruns[1], data)
+    _assert_run_is_not_in_dropdown(very_close_dagruns[2], data)
+    _assert_run_is_not_in_dropdown(very_close_dagruns[3], data)
+
+
+@pytest.mark.parametrize("endpoint", ENDPOINTS)
+def test_uses_execution_date_on_filter_application_if_base_date_hasnt_changed(
+    admin_client, very_close_dagruns, endpoint
+):
+    base_date = quote((VERY_CLOSE_RUNS_DATE + 
timedelta(seconds=1)).isoformat())
+    exec_date = quote(very_close_dagruns[1].execution_date.isoformat())
+    response = admin_client.get(
+        
f'{endpoint}&base_date={base_date}&num_runs=2&execution_date={exec_date}',
+        data={"username": "test", "password": "test"},
+        follow_redirects=True,
+    )
+    assert response.status_code == 200
+
+    data = response.data.decode()
+    _assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=1), data)
+    _assert_run_is_in_dropdown_not_selected(very_close_dagruns[0], data)
+    _assert_run_is_selected(very_close_dagruns[1], data)
+    _assert_run_is_not_in_dropdown(very_close_dagruns[2], data)
+    _assert_run_is_not_in_dropdown(very_close_dagruns[3], data)
+
+
+@pytest.mark.parametrize("endpoint", ENDPOINTS)
+def test_uses_base_date_if_changed_away_from_execution_date(admin_client, 
very_close_dagruns, endpoint):
+    base_date = quote((VERY_CLOSE_RUNS_DATE + 
timedelta(seconds=2)).isoformat())
+    exec_date = quote(very_close_dagruns[1].execution_date.isoformat())
+    response = admin_client.get(
+        
f'{endpoint}&base_date={base_date}&num_runs=2&execution_date={exec_date}',
+        data={"username": "test", "password": "test"},
+        follow_redirects=True,
+    )
+    assert response.status_code == 200
+
+    data = response.data.decode()
+    _assert_base_date(VERY_CLOSE_RUNS_DATE + timedelta(seconds=2), data)
+    _assert_run_is_not_in_dropdown(very_close_dagruns[0], data)
+    _assert_run_is_not_in_dropdown(very_close_dagruns[1], data)
+    _assert_run_is_in_dropdown_not_selected(very_close_dagruns[2], data)
+    _assert_run_is_selected(very_close_dagruns[3], data)

Reply via email to