This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 19468d9aae8eb5cb01a6751af8cebad8a9c5bc49
Author: ginevragaudioso <[email protected]>
AuthorDate: Mon Jun 14 06:34:03 2021 -0500

    Queue tasks with higher priority and earlier execution_date first. (#15210)
    
    Co-authored-by: Ginevra Gaudioso <[email protected]>
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
    (cherry picked from commit 943292b4e0c494f023c86d648289b1f23ccb0ee9)
---
 airflow/jobs/scheduler_job.py    |   1 +
 tests/jobs/test_scheduler_job.py | 159 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 160 insertions(+)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index cece87e..7514d92 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -938,6 +938,7 @@ class SchedulerJob(BaseJob):  # pylint: 
disable=too-many-instance-attributes
             .filter(not_(DM.is_paused))
             .filter(TI.state == State.SCHEDULED)
             .options(selectinload('dag_model'))
+            .order_by(-TI.priority_weight, TI.execution_date)
         )
         starved_pools = [pool_name for pool_name, stats in pools.items() if 
stats['open'] <= 0]
         if starved_pools:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index faf19d9..36bc1b5 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1177,6 +1177,165 @@ class TestSchedulerJob(unittest.TestCase):
         assert tis[3].key in res_keys
         session.rollback()
 
+    def test_find_executable_task_instances_order_execution_date(self):
+        dag_id_1 = 
'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a'
+        dag_id_2 = 
'SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = 
self.scheduler_job._executable_task_instances_to_queued(max_tis=1, 
session=session)
+        session.flush()
+        assert [ti.key for ti in res] == [tis[1].key]
+        session.rollback()
+
+    def test_find_executable_task_instances_order_priority(self):
+        dag_id_1 = 
'SchedulerJobTest.test_find_executable_task_instances_order_priority-a'
+        dag_id_2 = 
'SchedulerJobTest.test_find_executable_task_instances_order_priority-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id, 
priority_weight=1)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id, 
priority_weight=4)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = 
self.scheduler_job._executable_task_instances_to_queued(max_tis=1, 
session=session)
+        session.flush()
+        assert [ti.key for ti in res] == [tis[1].key]
+        session.rollback()
+
+    def 
test_find_executable_task_instances_order_execution_date_and_priority(self):
+        dag_id_1 = 
'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a'
+        dag_id_2 = 
'SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b'
+        task_id = 'task-a'
+        dag_1 = DAG(dag_id=dag_id_1, start_date=DEFAULT_DATE, concurrency=16)
+        dag_2 = DAG(dag_id=dag_id_2, start_date=DEFAULT_DATE, concurrency=16)
+        dag1_task = DummyOperator(dag=dag_1, task_id=task_id, 
priority_weight=1)
+        dag2_task = DummyOperator(dag=dag_2, task_id=task_id, 
priority_weight=4)
+        dag_1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_1))
+        dag_2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_2))
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        dag_model_1 = DagModel(
+            dag_id=dag_id_1,
+            is_paused=False,
+            concurrency=dag_1.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_1)
+        dag_model_2 = DagModel(
+            dag_id=dag_id_2,
+            is_paused=False,
+            concurrency=dag_2.concurrency,
+            has_task_concurrency_limits=False,
+        )
+        session.add(dag_model_2)
+        dr1 = dag_1.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+        )
+        dr2 = dag_2.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE + timedelta(hours=1),
+            state=State.RUNNING,
+        )
+
+        tis = [
+            TaskInstance(dag1_task, dr1.execution_date),
+            TaskInstance(dag2_task, dr2.execution_date),
+        ]
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+
+        res = 
self.scheduler_job._executable_task_instances_to_queued(max_tis=1, 
session=session)
+        session.flush()
+        assert [ti.key for ti in res] == [tis[1].key]
+        session.rollback()
+
     def test_find_executable_task_instances_in_default_pool(self):
         set_default_pool_slots(1)
 

Reply via email to