doiken opened a new issue, #31407: URL: https://github.com/apache/airflow/issues/31407
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened The scheduler rarely triggers a DagRun to be executed in the future. Here are the conditions as I understand them. - max_active_runs is set and upper limit is reached - The preceding DagRun completes very slightly earlier than the following DagRun Details in "Anything else". ### What you think should happen instead DagRun should wait until scheduled ### How to reproduce I have confirmed reproduction in Airflow 2.2.2 with the following code. I reproduced it in my environment after running it for about half a day. ``` python import copy import logging import time from datetime import datetime, timedelta import pendulum from airflow import DAG, AirflowException from airflow.sensors.python import PythonSensor from airflow.utils import timezone logger = logging.getLogger(__name__) # very small min_file_process_interval may help to reproduce more. e.g. min_file_process_interval=3 def create_dag(interval): with DAG( dag_id=f"example_reproduce_{interval:0>2}", schedule_interval=f"*/{interval} * * * *", start_date=datetime(2021, 1, 1), catchup=False, max_active_runs=2, tags=["example_race_condition"], ) as dag: target_s = 10 def raise_if_future(context): now = timezone.utcnow() - timedelta(seconds=30) if context["data_interval_start"] > now: raise AirflowException("DagRun supposed to be triggered in the future triggered") def wait_sync(): now_dt = pendulum.now() if now_dt.minute % (interval * 2) == 0: # wait until target time to synchronize end time with the preceding job target_dt = copy.copy(now_dt).replace(second=target_s + 2) wait_sec = (target_dt - now_dt).total_seconds() logger.info(f"sleep {now_dt} -> {target_dt} in {wait_sec} seconds") if wait_sec > 0: time.sleep(wait_sec) return True PythonSensor( task_id="t2", python_callable=wait_sync, # To avoid getting stuck in SequentialExecutor, try to re-poke after the next job starts poke_interval=interval * 60 + target_s, mode="reschedule", pre_execute=raise_if_future, ) return dag for i in [1, 2]: globals()[i] = create_dag(i) ``` ### Operating System Amazon Linux 2 ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details MWAA 2.2.2 ### Anything else The assumed flow and the associated actual query logs for the case max_active_runs=2 are shown below. **The assumed flow** 1. The first DagRun (DR1) starts 1. The subsequent DagRun (DR2) starts 1. DR2 completes; The scheduler set `next_dagrun_create_after=null` if max_active_runs is exceeded - https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L931 1. DR1 completes; The scheduler calls dag_model.calculate_dagrun_date_fields() in SchedulerJobRunner._schedule_dag_run(). The session is NOT committed yet - note: the result of `calculate_dagrun_date_fields` is the old DR1-based value from `dag.get_run_data_interval(DR"2")`. - https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L1017 1. DagFileProcessorProcess modifies next_dagrun_create_after - note: the dag record fetched in step 4 are not locked, so the `Processor` can select it and update it. - https://github.com/apache/airflow/blob/2.2.2/airflow/dag_processing/processor.py#L646 1. The scheduler reflects the calculation result of DR1 to DB by `guard.commit()` - note: Only the `next_dagrun_create_after` column set to null in step 2 is updated because sqlalchemy only updates the difference between the record retrieved in step 4 and the calculation result - https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L795 1. The scheduler triggers a future DagRun because the current time satisfies next_dagrun_create_after updated in step 6 **The associated query log** ``` sql bb55c5b0bdce: /# grep "future_dagrun_00" /var/lib/postgresql/data/log/postgresql-2023-03-08_210056.log | grep "next_dagrun" 2023-03-08 22: 00: 01.678 UTC[57378] LOG: statement: UPDATE dag SET next_dagrun_create_after = NULL WHERE dag.dag_id = 'future_dagrun_0' # set in step 3 2023-03-08 22: 00: 08.162 UTC[57472] LOG: statement: UPDATE dag SET last_parsed_time = '2023-03-08T22:00:07.683786+00:00':: timestamptz, next_dagrun = '2023-03-08T22:00:00+00:00':: timestamptz, next_dagrun_data_interval_start = '2023-03-08T22:00:00+00:00':: timestamptz, next_dagrun_data_interval_end = '2023-03-08T23:00:00+00:00':: timestamptz, next_dagrun_create_after = '2023-03-08T23:00:00+00:00'::timestamptz WHERE dag.dag_id = 'future_dagrun_00' # set in step 5 2023-03-08 22: 00: 09.137 UTC[57475] LOG: statement: UPDATE dag SET next_dagrun_create_after = '2023-03-08T22:00:00+00:00'::timestamptz WHERE dag.dag_id = 'future_dagrun_00' # set in step 6 2023-03-08 22: 00: 10.418 UTC[57479] LOG: statement: UPDATE dag SET next_dagrun = '2023-03-08T23:00:00+00:00':: timestamptz, next_dagrun_data_interval_start = '2023-03-08T23:00:00+00:00':: timestamptz, next_dagrun_data_interval_end = '2023-03-09T00:00:00+00:00':: timestamptz, next_dagrun_create_after = '2023-03-09T00:00:00+00:00'::timestamptz WHERE dag.dag_id = 'future_dagrun_00' # set in step 7 ``` From what I've read of the relevant code in the latest v2.6.1, I believe the problem continues. ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org