This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new ebbe2b4 Fix DagRun execution order from queued to running not being properly followed (#18061) ebbe2b4 is described below commit ebbe2b4cafebe2b523ca08abd40145c3c7eec046 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Thu Sep 9 12:03:33 2021 +0100 Fix DagRun execution order from queued to running not being properly followed (#18061) We made a fix that resolved max_active_runs not allowing other dagruns to move to running state, see #17945 and introduced a bug that dagruns were not following the execution_date order when moving to running state. This PR fixes it by adding a 'max_active_runs` column in dagmodel. Also an extra test not connected with this change was added because I was able to trigger the bug while working on this --- airflow/jobs/scheduler_job.py | 3 +- ...5d12_add_max_active_runs_column_to_dagmodel_.py | 59 +++++++++++++ airflow/models/dag.py | 4 + airflow/models/dagrun.py | 41 ++++++++- docs/apache-airflow/migrations-ref.rst | 4 +- tests/jobs/test_scheduler_job.py | 96 ++++++++++++++++++++++ tests/models/test_dag.py | 20 +++++ 7 files changed, 221 insertions(+), 6 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index d7239a8..4d8a704 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -887,14 +887,15 @@ class SchedulerJob(BaseJob): ) for dag_run in dag_runs: + try: dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) except SerializedDagNotFound: self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id) continue active_runs = active_runs_of_dags[dag_run.dag_id] + if dag.max_active_runs and active_runs >= dag.max_active_runs: - dag_run.last_scheduling_decision = timezone.utcnow() self.log.debug( "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s", dag.dag_id, diff --git a/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py new file mode 100644 index 0000000..c1a6e19 --- /dev/null +++ b/airflow/migrations/versions/092435bf5d12_add_max_active_runs_column_to_dagmodel_.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add max_active_runs column to dagmodel table + +Revision ID: 092435bf5d12 +Revises: 142555e44c17 +Create Date: 2021-09-06 21:29:24.728923 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = '092435bf5d12' +down_revision = '7b2661a43ba3' +branch_labels = None +depends_on = None + + +def upgrade(): + """Apply Add max_active_runs column to dagmodel table""" + op.add_column('dag', sa.Column('max_active_runs', sa.Integer(), nullable=True)) + with op.batch_alter_table('dag_run', schema=None) as batch_op: + # Add index to dag_run.dag_id and also add index to dag_run.state where state==running + batch_op.create_index('idx_dag_run_dag_id', ['dag_id']) + batch_op.create_index( + 'idx_dag_run_running_dags', + ["state", "dag_id"], + postgres_where=text("state='running'"), + mssql_where=text("state='running'"), + sqlite_where=text("state='running'"), + ) + + +def downgrade(): + """Unapply Add max_active_runs column to dagmodel table""" + op.drop_column('dag', 'max_active_runs') + with op.batch_alter_table('dag_run', schema=None) as batch_op: + # Drop index to dag_run.dag_id and also drop index to dag_run.state where state==running + batch_op.drop_index('idx_dag_run_dag_id') + batch_op.drop_index('idx_dag_run_running_dags') diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 7743399..dc15d21 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2263,6 +2263,7 @@ class DAG(LoggingMixin): orm_dag.description = dag.description orm_dag.schedule_interval = dag.schedule_interval orm_dag.max_active_tasks = dag.max_active_tasks + orm_dag.max_active_runs = dag.max_active_runs orm_dag.has_task_concurrency_limits = any(t.max_active_tis_per_dag is not None for t in dag.tasks) orm_dag.calculate_dagrun_date_fields( @@ -2493,6 +2494,7 @@ class DagModel(Base): tags = relationship('DagTag', cascade='all,delete-orphan', backref=backref('dag')) max_active_tasks = Column(Integer, nullable=False) + max_active_runs = Column(Integer, nullable=True) has_task_concurrency_limits = Column(Boolean, nullable=False) @@ -2529,6 +2531,8 @@ class DagModel(Base): self.max_active_tasks = concurrency else: self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag') + if self.max_active_runs is None: + self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag') if self.has_task_concurrency_limits is None: # Be safe -- this will be updated later once the DAG is parsed self.has_task_concurrency_limits = True diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index e8f5f98..aa32fdc 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -19,7 +19,19 @@ import warnings from datetime import datetime from typing import TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union -from sqlalchemy import Boolean, Column, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_ +from sqlalchemy import ( + Boolean, + Column, + Index, + Integer, + PickleType, + String, + UniqueConstraint, + and_, + func, + or_, + text, +) from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.orm import joinedload, relationship, synonym @@ -91,6 +103,15 @@ class DagRun(Base, LoggingMixin): UniqueConstraint('dag_id', 'execution_date', name='dag_run_dag_id_execution_date_key'), UniqueConstraint('dag_id', 'run_id', name='dag_run_dag_id_run_id_key'), Index('idx_last_scheduling_decision', last_scheduling_decision), + Index('idx_dag_run_dag_id', dag_id), + Index( + 'idx_dag_run_running_dags', + 'state', + 'dag_id', + postgres_where=text("state='running'"), + mssql_where=text("state='running'"), + sqlite_where=text("state='running'"), + ), ) task_instances = relationship(TI, back_populates="dag_run") @@ -207,10 +228,22 @@ class DagRun(Base, LoggingMixin): DagModel.is_paused == expression.false(), DagModel.is_active == expression.true(), ) - .order_by( - nulls_first(cls.last_scheduling_decision, session=session), - cls.execution_date, + ) + if state == State.QUEUED: + # For dag runs in the queued state, we check if they have reached the max_active_runs limit + # and if so we drop them + running_drs = ( + session.query(DagRun.dag_id, func.count(DagRun.state).label('num_running')) + .filter(DagRun.state == DagRunState.RUNNING) + .group_by(DagRun.dag_id) + .subquery() + ) + query = query.outerjoin(running_drs, running_drs.c.dag_id == DagRun.dag_id).filter( + func.coalesce(running_drs.c.num_running, 0) < DagModel.max_active_runs ) + query = query.order_by( + nulls_first(cls.last_scheduling_decision, session=session), + cls.execution_date, ) if not settings.ALLOW_FUTURE_EXEC_DATES: diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 052d61a..a5446b2 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ -| ``7b2661a43ba3`` (head) | ``142555e44c17`` | | Change TaskInstance and TaskReschedule tables from execution_date to run_id. | +| ``092435bf5d12`` (head) | ``7b2661a43ba3`` | | Add ``max_active_runs`` column to ``dag_model`` table | ++--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ +| ``7b2661a43ba3`` | ``142555e44c17`` | | Change TaskInstance and TaskReschedule tables from execution_date to run_id. | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ | ``142555e44c17`` | ``54bebd308c5f`` | | Add ``data_interval_start`` and ``data_interval_end`` to ``DagRun`` | +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index dfcb67e..ddeaf5f 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2689,6 +2689,102 @@ class TestSchedulerJob: ) assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 11 + def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker): + session = settings.Session() + with dag_maker('test_dag1', max_active_runs=1) as dag: + DummyOperator(task_id='mytask') + date = dag.following_schedule(DEFAULT_DATE) + for i in range(30): + dr = dag_maker.create_dagrun( + run_id=f'dagrun_{i}', run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date + ) + date = dr.execution_date + timedelta(hours=1) + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor(do_update=False) + self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + + self.scheduler_job._start_queued_dagruns(session) + session.flush() + dr = DagRun.find(run_id='dagrun_0') + ti = dr[0].get_task_instance(task_id='mytask', session=session) + ti.state = State.SUCCESS + session.merge(ti) + session.commit() + assert dr[0].state == State.RUNNING + dr[0].state = State.SUCCESS + session.merge(dr[0]) + session.flush() + assert dr[0].state == State.SUCCESS + self.scheduler_job._start_queued_dagruns(session) + session.flush() + dr = DagRun.find(run_id='dagrun_1') + assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 1 + + assert dr[0].state == State.RUNNING + + def test_no_dagruns_would_stuck_in_running(self, dag_maker): + # Test that running dagruns are not stuck in running. + # Create one dagrun in 'running' state and 1 in 'queued' state from one dag(max_active_runs=1) + # Create 16 dagruns in 'running' state and 16 in 'queued' state from another dag + # Create 16 dagruns in 'running' state and 16 in 'queued' state from yet another dag + # Finish the task of the first dag, and check that another dagrun starts running + # from the first dag. + + session = settings.Session() + # first dag and dagruns + date = timezone.datetime(2016, 1, 1) + with dag_maker('test_dagrun_states_are_correct_1', max_active_runs=1, start_date=date) as dag: + task1 = DummyOperator(task_id='dummy_task') + + dr1_running = dag_maker.create_dagrun(run_id='dr1_run_1', execution_date=date) + dag_maker.create_dagrun( + run_id='dr1_run_2', + state=State.QUEUED, + execution_date=dag.following_schedule(dr1_running.execution_date), + ) + # second dag and dagruns + date = timezone.datetime(2020, 1, 1) + with dag_maker('test_dagrun_states_are_correct_2', start_date=date) as dag: + DummyOperator(task_id='dummy_task') + for i in range(16): + dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.RUNNING, execution_date=date) + date = dr.execution_date + timedelta(hours=1) + dr16 = DagRun.find(run_id='dr2_run_16') + date = dr16[0].execution_date + timedelta(hours=1) + for i in range(16, 32): + dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date) + date = dr.execution_date + timedelta(hours=1) + + # third dag and dagruns + date = timezone.datetime(2021, 1, 1) + with dag_maker('test_dagrun_states_are_correct_3', start_date=date) as dag: + DummyOperator(task_id='dummy_task') + for i in range(16): + dr = dag_maker.create_dagrun(run_id=f'dr3_run_{i+1}', state=State.RUNNING, execution_date=date) + date = dr.execution_date + timedelta(hours=1) + dr16 = DagRun.find(run_id='dr3_run_16') + date = dr16[0].execution_date + timedelta(hours=1) + for i in range(16, 32): + dr = dag_maker.create_dagrun(run_id=f'dr2_run_{i+1}', state=State.QUEUED, execution_date=date) + date = dr.execution_date + timedelta(hours=1) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor(do_update=False) + self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + + ti = TaskInstance(task=task1, execution_date=DEFAULT_DATE) + ti.refresh_from_db() + ti.state = State.SUCCESS + session.merge(ti) + session.flush() + # Run the scheduler loop + with mock.patch.object(settings, "USE_JOB_SCHEDULE", False): + self.scheduler_job._do_scheduling(session) + self.scheduler_job._do_scheduling(session) + + assert DagRun.find(run_id='dr1_run_1')[0].state == State.SUCCESS + assert DagRun.find(run_id='dr1_run_2')[0].state == State.RUNNING + @pytest.mark.parametrize( "state, start_date, end_date", [ diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index c6d54ed..61cbcee 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -1759,6 +1759,26 @@ class TestDagModel: session.rollback() session.close() + def test_max_active_runs_not_none(self): + dag = DAG(dag_id='test_max_active_runs_not_none', start_date=timezone.datetime(2038, 1, 1)) + DummyOperator(task_id='dummy', dag=dag, owner='airflow') + + session = settings.Session() + orm_dag = DagModel( + dag_id=dag.dag_id, + has_task_concurrency_limits=False, + next_dagrun=None, + next_dagrun_create_after=None, + is_active=True, + ) + session.add(orm_dag) + session.flush() + + assert orm_dag.max_active_runs is not None + + session.rollback() + session.close() + def test_dags_needing_dagruns_only_unpaused(self): """ We should never create dagruns for unpaused DAGs