This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e6acf00417 Remove option for unlimited parallelism (#48218)
9e6acf00417 is described below

commit 9e6acf004173b68cffe7692239c410245dbe266e
Author: Jens Scheffler <[email protected]>
AuthorDate: Thu Mar 27 22:45:23 2025 +0100

    Remove option for unlimited parallelism (#48218)
    
    * Remove option for unlimited parallelism
    
    * Add newsfragment
    
    * Update newsfragments/48218.significant.rst
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * Fix newsfragment static check
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
---
 airflow-core/newsfragments/48218.significant.rst   |  14 +++
 .../src/airflow/config_templates/config.yml        |   2 +-
 .../src/airflow/executors/base_executor.py         |  20 ++--
 .../src/airflow/jobs/scheduler_job_runner.py       |   3 -
 .../tests/unit/executors/test_base_executor.py     |  30 +-----
 .../tests/unit/executors/test_local_executor.py    |   5 +-
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 118 ---------------------
 7 files changed, 28 insertions(+), 164 deletions(-)

diff --git a/airflow-core/newsfragments/48218.significant.rst 
b/airflow-core/newsfragments/48218.significant.rst
new file mode 100644
index 00000000000..cfde19d2fbe
--- /dev/null
+++ b/airflow-core/newsfragments/48218.significant.rst
@@ -0,0 +1,14 @@
+Remove option for unlimited parallelism.
+
+Before Airflow 3.0 it was possible to set unlimited parallelism by setting 
``[core] parallelism`` to 0. This was removed in Airflow 3.0.0.
+
+* Types of change
+
+  * [ ] Dag changes
+  * [x] Config changes
+  * [ ] API changes
+  * [ ] CLI changes
+  * [x] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [ ] Code interface changes
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 58eb9d33d80..b35b1796495 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -114,7 +114,7 @@ core:
         This defines the maximum number of task instances that can run 
concurrently per scheduler in
         Airflow, regardless of the worker count. Generally this value, 
multiplied by the number of
         schedulers in your cluster, is the maximum number of task instances 
with the running
-        state in the metadata database. Setting this value to zero allows 
unlimited parallelism.
+        state in the metadata database. The value must be larger or equal 1.
       version_added: ~
       type: string
       example: ~
diff --git a/airflow-core/src/airflow/executors/base_executor.py 
b/airflow-core/src/airflow/executors/base_executor.py
index c9e63c6ca19..558eb952bb1 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -19,7 +19,6 @@
 from __future__ import annotations
 
 import logging
-import sys
 from collections import defaultdict, deque
 from collections.abc import Sequence
 from dataclasses import dataclass, field
@@ -114,7 +113,7 @@ class BaseExecutor(LoggingMixin):
     """
     Base class to inherit for concrete executors such as Celery, Kubernetes, 
Local, Sequential, etc.
 
-    :param parallelism: how many jobs should run at one time. Set to ``0`` for 
infinity.
+    :param parallelism: how many jobs should run at one time.
     """
 
     supports_ad_hoc_ti_run: bool = False
@@ -162,6 +161,10 @@ class BaseExecutor(LoggingMixin):
         self.running: set[TaskInstanceKey] = set()
         self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {}
         self._task_event_logs: deque[Log] = deque()
+
+        if self.parallelism <= 0:
+            raise ValueError("parallelism is set to 0 or lower")
+
         """
         Deque for storing task event log messages.
 
@@ -264,10 +267,7 @@ class BaseExecutor(LoggingMixin):
     @add_span
     def heartbeat(self) -> None:
         """Heartbeat sent to trigger new jobs."""
-        if not self.parallelism:
-            open_slots = len(self.queued_tasks)
-        else:
-            open_slots = self.parallelism - len(self.running)
+        open_slots = self.parallelism - len(self.running)
 
         num_running_tasks = len(self.running)
         num_queued_tasks = len(self.queued_tasks)
@@ -321,8 +321,7 @@ class BaseExecutor(LoggingMixin):
         self.log.debug("%s running task instances for executor %s", 
num_running_tasks, name)
         self.log.debug("%s in queue for executor %s", num_queued_tasks, name)
         if open_slots == 0:
-            if self.parallelism:
-                self.log.info("Executor parallelism limit reached. 0 open 
slots.")
+            self.log.info("Executor parallelism limit reached. 0 open slots.")
         else:
             self.log.debug("%s open slots for executor %s", open_slots, name)
 
@@ -643,10 +642,7 @@ class BaseExecutor(LoggingMixin):
     @property
     def slots_available(self):
         """Number of new tasks this executor instance can accept."""
-        if self.parallelism:
-            return self.parallelism - len(self.running) - 
len(self.queued_tasks)
-        else:
-            return sys.maxsize
+        return self.parallelism - len(self.running) - len(self.queued_tasks)
 
     @property
     def slots_occupied(self):
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d6f4ebbdfc7..c0132edf9a2 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -696,9 +696,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         # across all executors.
         num_occupied_slots = sum([executor.slots_occupied for executor in 
self.job.executors])
         parallelism = conf.getint("core", "parallelism")
-        # Parallelism configured to 0 means infinite currently running tasks
-        if parallelism == 0:
-            parallelism = sys.maxsize
         if self.job.max_tis_per_query == 0:
             max_tis = parallelism - num_occupied_slots
         else:
diff --git a/airflow-core/tests/unit/executors/test_base_executor.py 
b/airflow-core/tests/unit/executors/test_base_executor.py
index bfec538c24c..67f4de7c6c4 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import logging
-import sys
 from datetime import timedelta
 from unittest import mock
 
@@ -49,14 +48,9 @@ def test_is_production_default_value():
     assert BaseExecutor.is_production
 
 
-def test_infinite_slotspool():
-    executor = BaseExecutor(0)
-    assert executor.slots_available == sys.maxsize
-
-
-def test_new_exec_no_slots_occupied():
-    executor = BaseExecutor(0)
-    assert executor.slots_occupied == 0
+def test_invalid_slotspool():
+    with pytest.raises(ValueError):
+        BaseExecutor(0)
 
 
 def test_get_task_log():
@@ -170,22 +164,6 @@ def test_gauge_executor_metrics_with_multiple_executors(
     mock_stats_gauge.assert_has_calls(calls)
 
 
[email protected]("airflow.executors.base_executor.BaseExecutor.sync")
[email protected]("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
[email protected]("airflow.executors.base_executor.Stats.gauge")
-def test_gauge_executor_with_infinite_pool_metrics(mock_stats_gauge, 
mock_trigger_tasks, mock_sync):
-    executor = BaseExecutor(0)
-    executor.heartbeat()
-    calls = [
-        mock.call("executor.open_slots", value=mock.ANY, tags={"status": 
"open", "name": "BaseExecutor"}),
-        mock.call("executor.queued_tasks", value=mock.ANY, tags={"status": 
"queued", "name": "BaseExecutor"}),
-        mock.call(
-            "executor.running_tasks", value=mock.ANY, tags={"status": 
"running", "name": "BaseExecutor"}
-        ),
-    ]
-    mock_stats_gauge.assert_has_calls(calls)
-
-
 def setup_dagrun(dag_maker):
     date = timezone.utcnow()
     start_date = date - timedelta(days=2)
@@ -367,7 +345,7 @@ def test_base_executor_cannot_send_callback():
 
 
 def test_parser_and_formatter_class():
-    executor = BaseExecutor()
+    executor = BaseExecutor(42)
     parser = executor._get_parser()
     assert isinstance(parser, DefaultHelpParser)
     assert parser.formatter_class is AirflowHelpFormatter
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py 
b/airflow-core/tests/unit/executors/test_local_executor.py
index ec72863b20e..f24104f3b41 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -123,10 +123,7 @@ class TestLocalExecutor:
     @skip_spawn_mp_start
     @pytest.mark.parametrize(
         ("parallelism",),
-        [
-            pytest.param(0, id="unlimited"),
-            pytest.param(2, id="limited"),
-        ],
+        [pytest.param(2, id="limited")],
     )
     def test_execution(self, parallelism: int):
         self._test_execute(parallelism=parallelism)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 026c755608c..8c9d5cd6bfe 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -21,7 +21,6 @@ import contextlib
 import datetime
 import logging
 import os
-import sys
 from collections import Counter, deque
 from collections.abc import Generator
 from datetime import timedelta
@@ -1922,123 +1921,6 @@ class TestSchedulerJob:
             assert total_enqueued == 31
         session.rollback()
 
-    @pytest.mark.parametrize(
-        "task1_exec, task2_exec",
-        [
-            ("default_exec", "default_exec"),
-            ("default_exec", "secondary_exec"),
-            ("secondary_exec", "secondary_exec"),
-        ],
-    )
-    def test_execute_task_instances_unlimited_parallelism_multiple_executors(
-        self, task1_exec, task2_exec, dag_maker, mock_executors
-    ):
-        """Test core.parallelism leads to unlimited scheduling, but queries 
limited by max_tis"""
-        dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
-        task_id_1 = "dummy_task"
-        task_id_2 = "dummy_task_2"
-        session = settings.Session()
-
-        with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
-            task1 = EmptyOperator(task_id=task_id_1, executor=task1_exec)
-            task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
-
-        scheduler_job = Job()
-        self.job_runner = SchedulerJobRunner(job=scheduler_job)
-
-        def _create_dagruns():
-            dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.RUNNING)
-            yield dagrun
-            for _ in range(39):
-                dagrun = dag_maker.create_dagrun_after(
-                    dagrun,
-                    run_type=DagRunType.SCHEDULED,
-                    state=State.RUNNING,
-                )
-                yield dagrun
-
-        for dr in _create_dagruns():
-            ti1 = dr.get_task_instance(task1.task_id, session)
-            ti2 = dr.get_task_instance(task2.task_id, session)
-            ti1.state = State.SCHEDULED
-            ti2.state = State.SCHEDULED
-            session.flush()
-
-        scheduler_job.max_tis_per_query = 50
-        for executor in mock_executors:
-            executor.parallelism = 0
-            executor.slots_occupied = 0
-            executor.slots_available = sys.maxsize
-
-        with conf_vars({("core", "parallelism"): "0"}):
-            # 40 dag runs * 2 tasks each = 80.
-            enqueued = 
self.job_runner._critical_section_enqueue_task_instances(session)
-            # Parallelism is unlimited, but we still only query for 
max_tis_per_query each time we enqueue
-            assert enqueued == 50
-
-            enqueued = 
self.job_runner._critical_section_enqueue_task_instances(session)
-            # The remaining 30 are enqueued the next loop
-            assert enqueued == 30
-
-        session.rollback()
-
-    @pytest.mark.parametrize(
-        "task1_exec, task2_exec",
-        [
-            ("default_exec", "default_exec"),
-            ("default_exec", "secondary_exec"),
-            ("secondary_exec", "secondary_exec"),
-        ],
-    )
-    def 
test_execute_task_instances_unlimited_parallelism_unlimited_max_tis_multiple_executors(
-        self, task1_exec, task2_exec, dag_maker, mock_executors
-    ):
-        """Test core.parallelism leads to unlimited scheduling"""
-        dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
-        task_id_1 = "dummy_task"
-        task_id_2 = "dummy_task_2"
-        session = settings.Session()
-
-        with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
-            task1 = EmptyOperator(task_id=task_id_1, executor=task1_exec)
-            task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
-
-        scheduler_job = Job()
-        self.job_runner = SchedulerJobRunner(job=scheduler_job)
-
-        def _create_dagruns():
-            dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, 
state=State.RUNNING)
-            yield dagrun
-            for _ in range(39):
-                dagrun = dag_maker.create_dagrun_after(
-                    dagrun,
-                    run_type=DagRunType.SCHEDULED,
-                    state=State.RUNNING,
-                )
-                yield dagrun
-
-        for dr in _create_dagruns():
-            ti1 = dr.get_task_instance(task1.task_id, session)
-            ti2 = dr.get_task_instance(task2.task_id, session)
-            ti1.state = State.SCHEDULED
-            ti2.state = State.SCHEDULED
-            session.flush()
-
-        scheduler_job.max_tis_per_query = 0
-        for executor in mock_executors:
-            executor.parallelism = 0
-            executor.slots_occupied = 0
-            executor.slots_available = sys.maxsize
-
-        with conf_vars({("core", "parallelism"): "0"}):
-            # 40 dag runs * 2 tasks each = 80. With core.parallelism set to 
zero then executors have
-            # unlimited slots and with max_tis_per_query set to zero, query 
will match also allow infinity.
-            # Thus, all tasks should be enqueued in one step
-            enqueued = 
self.job_runner._critical_section_enqueue_task_instances(session)
-
-            assert enqueued == 80
-        session.rollback()
-
     def test_adopt_or_reset_orphaned_tasks(self, dag_maker, session):
         with dag_maker("test_execute_helper_reset_orphaned_tasks", 
session=session):
             op1 = EmptyOperator(task_id="op1")

Reply via email to