doiken opened a new issue, #31407:
URL: https://github.com/apache/airflow/issues/31407

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   The scheduler rarely triggers a DagRun to be executed in the future.
   Here are the conditions as I understand them.
   - max_active_runs is set and upper limit is reached
   - The preceding DagRun completes very slightly earlier than the following 
DagRun
   
   Details in "Anything else".
   
   ### What you think should happen instead
   
   DagRun should wait until scheduled
   
   ### How to reproduce
   
   I have confirmed reproduction in Airflow 2.2.2 with the following code.
   I reproduced it in my environment after running it for about half a day.
   
   ``` python
   import copy
   import logging
   import time
   from datetime import datetime, timedelta
   
   import pendulum
   from airflow import DAG, AirflowException
   from airflow.sensors.python import PythonSensor
   from airflow.utils import timezone
   
   logger = logging.getLogger(__name__)
   
   # very small min_file_process_interval may help to reproduce more. e.g. 
min_file_process_interval=3
   def create_dag(interval):
       with DAG(
           dag_id=f"example_reproduce_{interval:0>2}",
           schedule_interval=f"*/{interval} * * * *",
           start_date=datetime(2021, 1, 1),
           catchup=False,
           max_active_runs=2,
           tags=["example_race_condition"],
       ) as dag:
           target_s = 10
   
           def raise_if_future(context):
               now = timezone.utcnow() - timedelta(seconds=30)
               if context["data_interval_start"] > now:
                   raise AirflowException("DagRun supposed to be triggered in 
the future triggered")
   
           def wait_sync():
               now_dt = pendulum.now()
               if now_dt.minute % (interval * 2) == 0:
                   # wait until target time to synchronize end time with the 
preceding job
                   target_dt = copy.copy(now_dt).replace(second=target_s + 2)
                   wait_sec = (target_dt - now_dt).total_seconds()
                   logger.info(f"sleep {now_dt} -> {target_dt} in {wait_sec} 
seconds")
                   if wait_sec > 0:
                       time.sleep(wait_sec)
                   return True
   
           PythonSensor(
               task_id="t2",
               python_callable=wait_sync,
               # To avoid getting stuck in SequentialExecutor, try to re-poke 
after the next job starts
               poke_interval=interval * 60 + target_s,
               mode="reschedule",
               pre_execute=raise_if_future,
           )
   
           return dag
   
   for i in [1, 2]:
       globals()[i] = create_dag(i)
   ```
   
   ### Operating System
   
   Amazon Linux 2
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   MWAA 2.2.2
   
   ### Anything else
   
   The assumed flow and the associated actual query logs for the case 
max_active_runs=2 are shown below.
   
   **The assumed flow**
   
   1. The first DagRun (DR1) starts
   1. The subsequent DagRun (DR2) starts
   1. DR2 completes; The scheduler set `next_dagrun_create_after=null` if 
max_active_runs is exceeded
       - 
https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L931
   1. DR1 completes; The scheduler calls 
dag_model.calculate_dagrun_date_fields() in 
SchedulerJobRunner._schedule_dag_run(). The session is NOT committed yet
       - note: the result of `calculate_dagrun_date_fields` is the old 
DR1-based value from `dag.get_run_data_interval(DR"2")`.
       - 
https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L1017
   1. DagFileProcessorProcess modifies next_dagrun_create_after
       - note: the dag record fetched in step 4 are not locked, so the 
`Processor` can select it and update it.
       - 
https://github.com/apache/airflow/blob/2.2.2/airflow/dag_processing/processor.py#L646
   1. The scheduler reflects the calculation result of DR1 to DB by 
`guard.commit()`
       - note: Only the `next_dagrun_create_after` column set to null in step 2 
is updated because sqlalchemy only updates the difference between the record 
retrieved in step 4 and the calculation result
       - 
https://github.com/apache/airflow/blob/2.2.2/airflow/jobs/scheduler_job.py#L795
   1. The scheduler triggers a future DagRun because the current time satisfies 
next_dagrun_create_after updated in step 6
   
   **The associated query log**
   
   ``` sql
   bb55c5b0bdce: /# grep "future_dagrun_00" 
/var/lib/postgresql/data/log/postgresql-2023-03-08_210056.log | grep 
"next_dagrun" 
   2023-03-08 22: 00: 01.678 UTC[57378] LOG: statement: UPDATE dag SET 
next_dagrun_create_after = NULL WHERE dag.dag_id = 'future_dagrun_0' # set in 
step 3
   2023-03-08 22: 00: 08.162 UTC[57472] LOG: statement: UPDATE dag SET 
last_parsed_time = '2023-03-08T22:00:07.683786+00:00':: timestamptz, 
next_dagrun = '2023-03-08T22:00:00+00:00':: timestamptz, 
next_dagrun_data_interval_start = '2023-03-08T22:00:00+00:00':: timestamptz, 
next_dagrun_data_interval_end = '2023-03-08T23:00:00+00:00':: timestamptz, 
next_dagrun_create_after = '2023-03-08T23:00:00+00:00'::timestamptz WHERE 
dag.dag_id = 'future_dagrun_00' # set in step 5
   2023-03-08 22: 00: 09.137 UTC[57475] LOG: statement: UPDATE dag SET 
next_dagrun_create_after = '2023-03-08T22:00:00+00:00'::timestamptz WHERE 
dag.dag_id = 'future_dagrun_00'  # set in step 6
   2023-03-08 22: 00: 10.418 UTC[57479] LOG: statement: UPDATE dag SET 
next_dagrun = '2023-03-08T23:00:00+00:00':: timestamptz, 
next_dagrun_data_interval_start = '2023-03-08T23:00:00+00:00':: timestamptz, 
next_dagrun_data_interval_end = '2023-03-09T00:00:00+00:00':: timestamptz, 
next_dagrun_create_after = '2023-03-09T00:00:00+00:00'::timestamptz WHERE 
dag.dag_id = 'future_dagrun_00' # set in step 7
   ```
   
   From what I've read of the relevant code in the latest v2.6.1, I believe the 
problem continues.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to