This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new f0824266977 [v3-0-test] Fix local executor task execution (#54523) 
(#54922)
f0824266977 is described below

commit f0824266977a247dc64f8a8ec791dd4c9b5f6366
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Aug 25 21:10:49 2025 +0200

    [v3-0-test] Fix local executor task execution (#54523) (#54922)
    
    LocalExecutor would only actually begin execution for every second
    task submitted to it. This change fixes this behaviour.
    
    The number of queued tasks would need to exceed the number of currently
    running tasks for a new task to be executed, which is logically close to
    what is needed but not quite. This means if 4 tasks are running and 4
    are pending none of the 4 tasks will begin, but if a 5th task is queued
    it will begin and there will be 5 tasks running and 4 waiting, it will
    take another two tasks submitted (for a total of 5 then 6 pending tasks)
    before the next is started, and so one. So in reality one of every two
    tasks is started.
    
    The logic is really as simple as: if we have any pending tasks and we
    are still within our parallelism limits, start those tasks running. This
    schedules tasks in accordance to user expectations and how Airflow 2.X
    scheduled tasks for the LocalExecutor (since this regressed during the
    migration from 2.X to 3.X for the LocalExecutor)
    (cherry picked from commit e36a809ebdaa04fbc7a76d546ff9fa1db2cb39d4)
    
    Co-authored-by: Niko Oliveira <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../src/airflow/executors/local_executor.py        |   3 +-
 .../executors/test_local_executor_check_workers.py | 146 +++++++++++++++++++++
 2 files changed, 147 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/executors/local_executor.py 
b/airflow-core/src/airflow/executors/local_executor.py
index 4c8ca1e73cb..e67284c4e6e 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -186,8 +186,7 @@ class LocalExecutor(BaseExecutor):
         # If we're using spawn in multiprocessing (default on macOS now) to 
start tasks, this can get called a
         # via `sync()` a few times before the spawned process actually starts 
picking up messages. Try not to
         # create too much
-        need_more_workers = len(self.workers) < num_outstanding
-        if need_more_workers and (self.parallelism == 0 or len(self.workers) < 
self.parallelism):
+        if num_outstanding and (self.parallelism == 0 or len(self.workers) < 
self.parallelism):
             # This only creates one worker, which is fine as we call this 
directly after putting a message on
             # activity_queue in execute_async
             self._spawn_worker()
diff --git 
a/airflow-core/tests/unit/executors/test_local_executor_check_workers.py 
b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
new file mode 100644
index 00000000000..094ce230596
--- /dev/null
+++ b/airflow-core/tests/unit/executors/test_local_executor_check_workers.py
@@ -0,0 +1,146 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.executors.local_executor import LocalExecutor
+
+
[email protected](autouse=True)
+def setup_executor(monkeypatch):
+    executor = LocalExecutor(parallelism=2)
+    executor.workers = {}
+    executor._unread_messages = MagicMock()
+    executor.activity_queue = MagicMock()
+    monkeypatch.setattr(executor, "_spawn_worker", MagicMock())
+    return executor
+
+
+def test_no_workers_on_no_work(setup_executor):
+    executor = setup_executor
+    executor._unread_messages.value = 0
+    executor.activity_queue.empty.return_value = True
+    executor._check_workers()
+    executor._spawn_worker.assert_not_called()
+    assert executor.workers == {}
+
+
+def test_all_workers_alive(setup_executor):
+    executor = setup_executor
+    proc1 = MagicMock()
+    proc1.is_alive.return_value = True
+    proc2 = MagicMock()
+    proc2.is_alive.return_value = True
+    executor.workers = {1: proc1, 2: proc2}
+    executor._unread_messages.value = 0
+    executor.activity_queue.empty.return_value = True
+    executor._check_workers()
+    proc1.close.assert_not_called()
+    proc2.close.assert_not_called()
+    assert len(executor.workers) == 2
+
+
+def test_some_workers_dead(setup_executor):
+    executor = setup_executor
+    proc1 = MagicMock()
+    proc1.is_alive.return_value = False
+    proc2 = MagicMock()
+    proc2.is_alive.return_value = True
+    executor.workers = {1: proc1, 2: proc2}
+    executor._unread_messages.value = 0
+    executor.activity_queue.empty.return_value = True
+    executor._check_workers()
+    proc1.close.assert_called_once()
+    proc2.close.assert_not_called()
+    assert executor.workers == {2: proc2}
+
+
+def test_all_workers_dead(setup_executor):
+    executor = setup_executor
+    proc1 = MagicMock()
+    proc1.is_alive.return_value = False
+    proc2 = MagicMock()
+    proc2.is_alive.return_value = False
+    executor.workers = {1: proc1, 2: proc2}
+    executor._unread_messages.value = 0
+    executor.activity_queue.empty.return_value = True
+    executor._check_workers()
+    proc1.close.assert_called_once()
+    proc2.close.assert_called_once()
+    assert executor.workers == {}
+
+
+def test_outstanding_messages_and_empty_queue(setup_executor):
+    executor = setup_executor
+    executor._unread_messages.value = 1
+    executor.activity_queue.empty.return_value = True
+    executor._check_workers()
+    executor._spawn_worker.assert_not_called()
+
+
+def test_spawn_worker_when_needed(setup_executor):
+    executor = setup_executor
+    executor._unread_messages.value = 1
+    executor.activity_queue.empty.return_value = False
+    executor.workers = {}
+    executor._check_workers()
+    executor._spawn_worker.assert_called_once()
+
+
+def test_no_spawn_if_parallelism_reached(setup_executor):
+    executor = setup_executor
+    executor._unread_messages.value = 2
+    executor.activity_queue.empty.return_value = False
+    proc1 = MagicMock()
+    proc1.is_alive.return_value = True
+    proc2 = MagicMock()
+    proc2.is_alive.return_value = True
+    executor.workers = {1: proc1, 2: proc2}
+    executor._check_workers()
+    executor._spawn_worker.assert_not_called()
+
+
+def test_parallelism_zero_spawns_worker(setup_executor):
+    executor = setup_executor
+    executor.parallelism = 0
+    executor._unread_messages.value = 1
+    executor.activity_queue.empty.return_value = False
+    executor.workers = {}
+    executor._check_workers()
+    executor._spawn_worker.assert_called_once()
+
+
+def test_spawn_worker_when_we_have_parallelism_left(setup_executor):
+    executor = setup_executor
+    # Simulate 4 running workers
+    running_workers = {}
+    for i in range(4):
+        proc = MagicMock()
+        proc.is_alive.return_value = True
+        running_workers[i] = proc
+    executor.workers = running_workers
+    executor.parallelism = 5  # Allow more workers if needed
+
+    # Simulate 4 pending tasks (equal to running workers)
+    executor._unread_messages.value = 4
+    executor.activity_queue.empty.return_value = False
+    executor._spawn_worker.reset_mock()
+    executor._check_workers()
+    executor._spawn_worker.assert_called_once()

Reply via email to