This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 19468d9aae8eb5cb01a6751af8cebad8a9c5bc49 Author: ginevragaudioso <[email protected]> AuthorDate: Mon Jun 14 06:34:03 2021 -0500 Queue tasks with higher priority and earlier execution_date first. (#15210) Co-authored-by: Ginevra Gaudioso <[email protected]> Co-authored-by: Ash Berlin-Taylor <[email protected]> (cherry picked from commit 943292b4e0c494f023c86d648289b1f23ccb0ee9) --- airflow/jobs/scheduler_job.py | 1 + tests/jobs/test_scheduler_job.py | 159 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index cece87e..7514d92 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -938,6 +938,7 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes .filter(not_(DM.is_paused)) .filter(TI.state == State.SCHEDULED) .options(selectinload('dag_model')) + .order_by(-TI.priority_weight, TI.execution_date) ) starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0] if starved_pools: diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index faf19d9..36bc1b5 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1177,6 +1177,165 @@ class TestSchedulerJob(unittest.TestCase): assert tis[3].key in res_keys session.rollback() + def test_find_executable_task_instances_order_execution_date(self): + dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a' + dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b' + task_id = 'task-a' + dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16) + dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16) + dag1_task = DummyOperator(dag=dag_1, task_id=task_id) + dag2_task = DummyOperator(dag=dag_2, task_id=task_id) + dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1)) + dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2)) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_model_1 = DagModel( + dag_id=dag_id_1, + is_paused=False, + concurrency=dag_1.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_1) + dag_model_2 = DagModel( + dag_id=dag_id_2, + is_paused=False, + concurrency=dag_2.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_2) + dr1 = dag_1.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE + timedelta(hours=1), + state=State.RUNNING, + ) + dr2 = dag_2.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + + tis = [ + TaskInstance(dag1_task, dr1.execution_date), + TaskInstance(dag2_task, dr2.execution_date), + ] + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session) + session.flush() + assert [ti.key for ti in res] == [tis[1].key] + session.rollback() + + def test_find_executable_task_instances_order_priority(self): + dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-a' + dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_priority-b' + task_id = 'task-a' + dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16) + dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16) + dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1) + dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4) + dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1)) + dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2)) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_model_1 = DagModel( + dag_id=dag_id_1, + is_paused=False, + concurrency=dag_1.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_1) + dag_model_2 = DagModel( + dag_id=dag_id_2, + is_paused=False, + concurrency=dag_2.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_2) + dr1 = dag_1.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + dr2 = dag_2.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + + tis = [ + TaskInstance(dag1_task, dr1.execution_date), + TaskInstance(dag2_task, dr2.execution_date), + ] + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session) + session.flush() + assert [ti.key for ti in res] == [tis[1].key] + session.rollback() + + def test_find_executable_task_instances_order_execution_date_and_priority(self): + dag_id_1 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a' + dag_id_2 = 'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b' + task_id = 'task-a' + dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16) + dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16) + dag1_task = DummyOperator(dag=dag_1, task_id=task_id, priority_weight=1) + dag2_task = DummyOperator(dag=dag_2, task_id=task_id, priority_weight=4) + dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1)) + dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2)) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + dag_model_1 = DagModel( + dag_id=dag_id_1, + is_paused=False, + concurrency=dag_1.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_1) + dag_model_2 = DagModel( + dag_id=dag_id_2, + is_paused=False, + concurrency=dag_2.concurrency, + has_task_concurrency_limits=False, + ) + session.add(dag_model_2) + dr1 = dag_1.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + dr2 = dag_2.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE + timedelta(hours=1), + state=State.RUNNING, + ) + + tis = [ + TaskInstance(dag1_task, dr1.execution_date), + TaskInstance(dag2_task, dr2.execution_date), + ] + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session) + session.flush() + assert [ti.key for ti in res] == [tis[1].key] + session.rollback() + def test_find_executable_task_instances_in_default_pool(self): set_default_pool_slots(1)
