[ 
https://issues.apache.org/jira/browse/AIRFLOW-1104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564330#comment-16564330
 ] 

ASF GitHub Bot commented on AIRFLOW-1104:
-----------------------------------------

kaxil closed pull request #3568: AIRFLOW-1104 Update jobs.py so Airflow does 
not over schedule tasks
URL: https://github.com/apache/incubator-airflow/pull/3568
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/jobs.py b/airflow/jobs.py
index 224ff185fb..a4252473cd 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1075,9 +1075,6 @@ def _find_executable_task_instances(self, simple_dag_bag, 
states, session=None):
         :type states: Tuple[State]
         :return: List[TaskInstance]
         """
-        # TODO(saguziel): Change this to include QUEUED, for concurrency
-        # purposes we may want to count queued tasks
-        states_to_count_as_running = [State.RUNNING]
         executable_tis = []
 
         # Get all the queued task instances from associated with scheduled
@@ -1123,6 +1120,7 @@ def _find_executable_task_instances(self, simple_dag_bag, 
states, session=None):
         for task_instance in task_instances_to_examine:
             pool_to_task_instances[task_instance.pool].append(task_instance)
 
+        states_to_count_as_running = [State.RUNNING, State.QUEUED]
         task_concurrency_map = self.__get_task_concurrency_map(
             states=states_to_count_as_running, session=session)
 
@@ -1173,7 +1171,6 @@ def _find_executable_task_instances(self, simple_dag_bag, 
states, session=None):
                 simple_dag = simple_dag_bag.get_dag(dag_id)
 
                 if dag_id not in dag_id_to_possibly_running_task_count:
-                    # TODO(saguziel): also check against QUEUED state, see 
AIRFLOW-1104
                     dag_id_to_possibly_running_task_count[dag_id] = \
                         DAG.get_num_task_instances(
                             dag_id,
diff --git a/tests/jobs.py b/tests/jobs.py
index 93f6574df4..c701214f1e 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1493,6 +1493,39 @@ def 
test_find_executable_task_instances_concurrency(self):
 
         self.assertEqual(0, len(res))
 
+    def test_find_executable_task_instances_concurrency_queued(self):
+        dag_id = 
'SchedulerJobTest.test_find_executable_task_instances_concurrency_queued'
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3)
+        task1 = DummyOperator(dag=dag, task_id='dummy1')
+        task2 = DummyOperator(dag=dag, task_id='dummy2')
+        task3 = DummyOperator(dag=dag, task_id='dummy3')
+        dagbag = self._make_simple_dag_bag([dag])
+
+        scheduler = SchedulerJob()
+        session = settings.Session()
+        dag_run = scheduler.create_dag_run(dag)
+
+        ti1 = TI(task1, dag_run.execution_date)
+        ti2 = TI(task2, dag_run.execution_date)
+        ti3 = TI(task3, dag_run.execution_date)
+        ti1.state = State.RUNNING
+        ti2.state = State.QUEUED
+        ti3.state = State.SCHEDULED
+
+        session.merge(ti1)
+        session.merge(ti2)
+        session.merge(ti3)
+
+        session.commit()
+
+        res = scheduler._find_executable_task_instances(
+            dagbag,
+            states=[State.SCHEDULED],
+            session=session)
+
+        self.assertEqual(1, len(res))
+        self.assertEqual(res[0].key, ti3.key)
+
     def test_find_executable_task_instances_task_concurrency(self):
         dag_id = 
'SchedulerJobTest.test_find_executable_task_instances_task_concurrency'
         task_id_1 = 'dummy'


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Concurrency check in scheduler should count queued tasks as well as running
> ---------------------------------------------------------------------------
>
>                 Key: AIRFLOW-1104
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1104
>             Project: Apache Airflow
>          Issue Type: Bug
>         Environment: see https://github.com/apache/incubator-airflow/pull/2221
> "Tasks with the QUEUED state should also be counted below, but for now we 
> cannot count them. This is because there is no guarantee that queued tasks in 
> failed dagruns will or will not eventually run and queued tasks that will 
> never run will consume slots and can stall a DAG. Once we can guarantee that 
> all queued tasks in failed dagruns will never run (e.g. make sure that all 
> running/newly queued TIs have running dagruns), then we can include QUEUED 
> tasks here, with the constraint that they are in running dagruns."
>            Reporter: Alex Guziel
>            Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to