This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9e6acf00417 Remove option for unlimited parallelism (#48218)
9e6acf00417 is described below
commit 9e6acf004173b68cffe7692239c410245dbe266e
Author: Jens Scheffler <[email protected]>
AuthorDate: Thu Mar 27 22:45:23 2025 +0100
Remove option for unlimited parallelism (#48218)
* Remove option for unlimited parallelism
* Add newsfragment
* Update newsfragments/48218.significant.rst
Co-authored-by: Jed Cunningham
<[email protected]>
* Fix newsfragment static check
---------
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow-core/newsfragments/48218.significant.rst | 14 +++
.../src/airflow/config_templates/config.yml | 2 +-
.../src/airflow/executors/base_executor.py | 20 ++--
.../src/airflow/jobs/scheduler_job_runner.py | 3 -
.../tests/unit/executors/test_base_executor.py | 30 +-----
.../tests/unit/executors/test_local_executor.py | 5 +-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 118 ---------------------
7 files changed, 28 insertions(+), 164 deletions(-)
diff --git a/airflow-core/newsfragments/48218.significant.rst
b/airflow-core/newsfragments/48218.significant.rst
new file mode 100644
index 00000000000..cfde19d2fbe
--- /dev/null
+++ b/airflow-core/newsfragments/48218.significant.rst
@@ -0,0 +1,14 @@
+Remove option for unlimited parallelism.
+
+Before Airflow 3.0 it was possible to set unlimited parallelism by setting
``[core] parallelism`` to 0. This was removed in Airflow 3.0.0.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [x] Config changes
+ * [ ] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 58eb9d33d80..b35b1796495 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -114,7 +114,7 @@ core:
This defines the maximum number of task instances that can run
concurrently per scheduler in
Airflow, regardless of the worker count. Generally this value,
multiplied by the number of
schedulers in your cluster, is the maximum number of task instances
with the running
- state in the metadata database. Setting this value to zero allows
unlimited parallelism.
+ state in the metadata database. The value must be larger or equal 1.
version_added: ~
type: string
example: ~
diff --git a/airflow-core/src/airflow/executors/base_executor.py
b/airflow-core/src/airflow/executors/base_executor.py
index c9e63c6ca19..558eb952bb1 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -19,7 +19,6 @@
from __future__ import annotations
import logging
-import sys
from collections import defaultdict, deque
from collections.abc import Sequence
from dataclasses import dataclass, field
@@ -114,7 +113,7 @@ class BaseExecutor(LoggingMixin):
"""
Base class to inherit for concrete executors such as Celery, Kubernetes,
Local, Sequential, etc.
- :param parallelism: how many jobs should run at one time. Set to ``0`` for
infinity.
+ :param parallelism: how many jobs should run at one time.
"""
supports_ad_hoc_ti_run: bool = False
@@ -162,6 +161,10 @@ class BaseExecutor(LoggingMixin):
self.running: set[TaskInstanceKey] = set()
self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {}
self._task_event_logs: deque[Log] = deque()
+
+ if self.parallelism <= 0:
+ raise ValueError("parallelism is set to 0 or lower")
+
"""
Deque for storing task event log messages.
@@ -264,10 +267,7 @@ class BaseExecutor(LoggingMixin):
@add_span
def heartbeat(self) -> None:
"""Heartbeat sent to trigger new jobs."""
- if not self.parallelism:
- open_slots = len(self.queued_tasks)
- else:
- open_slots = self.parallelism - len(self.running)
+ open_slots = self.parallelism - len(self.running)
num_running_tasks = len(self.running)
num_queued_tasks = len(self.queued_tasks)
@@ -321,8 +321,7 @@ class BaseExecutor(LoggingMixin):
self.log.debug("%s running task instances for executor %s",
num_running_tasks, name)
self.log.debug("%s in queue for executor %s", num_queued_tasks, name)
if open_slots == 0:
- if self.parallelism:
- self.log.info("Executor parallelism limit reached. 0 open
slots.")
+ self.log.info("Executor parallelism limit reached. 0 open slots.")
else:
self.log.debug("%s open slots for executor %s", open_slots, name)
@@ -643,10 +642,7 @@ class BaseExecutor(LoggingMixin):
@property
def slots_available(self):
"""Number of new tasks this executor instance can accept."""
- if self.parallelism:
- return self.parallelism - len(self.running) -
len(self.queued_tasks)
- else:
- return sys.maxsize
+ return self.parallelism - len(self.running) - len(self.queued_tasks)
@property
def slots_occupied(self):
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d6f4ebbdfc7..c0132edf9a2 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -696,9 +696,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# across all executors.
num_occupied_slots = sum([executor.slots_occupied for executor in
self.job.executors])
parallelism = conf.getint("core", "parallelism")
- # Parallelism configured to 0 means infinite currently running tasks
- if parallelism == 0:
- parallelism = sys.maxsize
if self.job.max_tis_per_query == 0:
max_tis = parallelism - num_occupied_slots
else:
diff --git a/airflow-core/tests/unit/executors/test_base_executor.py
b/airflow-core/tests/unit/executors/test_base_executor.py
index bfec538c24c..67f4de7c6c4 100644
--- a/airflow-core/tests/unit/executors/test_base_executor.py
+++ b/airflow-core/tests/unit/executors/test_base_executor.py
@@ -18,7 +18,6 @@
from __future__ import annotations
import logging
-import sys
from datetime import timedelta
from unittest import mock
@@ -49,14 +48,9 @@ def test_is_production_default_value():
assert BaseExecutor.is_production
-def test_infinite_slotspool():
- executor = BaseExecutor(0)
- assert executor.slots_available == sys.maxsize
-
-
-def test_new_exec_no_slots_occupied():
- executor = BaseExecutor(0)
- assert executor.slots_occupied == 0
+def test_invalid_slotspool():
+ with pytest.raises(ValueError):
+ BaseExecutor(0)
def test_get_task_log():
@@ -170,22 +164,6 @@ def test_gauge_executor_metrics_with_multiple_executors(
mock_stats_gauge.assert_has_calls(calls)
[email protected]("airflow.executors.base_executor.BaseExecutor.sync")
[email protected]("airflow.executors.base_executor.BaseExecutor.trigger_tasks")
[email protected]("airflow.executors.base_executor.Stats.gauge")
-def test_gauge_executor_with_infinite_pool_metrics(mock_stats_gauge,
mock_trigger_tasks, mock_sync):
- executor = BaseExecutor(0)
- executor.heartbeat()
- calls = [
- mock.call("executor.open_slots", value=mock.ANY, tags={"status":
"open", "name": "BaseExecutor"}),
- mock.call("executor.queued_tasks", value=mock.ANY, tags={"status":
"queued", "name": "BaseExecutor"}),
- mock.call(
- "executor.running_tasks", value=mock.ANY, tags={"status":
"running", "name": "BaseExecutor"}
- ),
- ]
- mock_stats_gauge.assert_has_calls(calls)
-
-
def setup_dagrun(dag_maker):
date = timezone.utcnow()
start_date = date - timedelta(days=2)
@@ -367,7 +345,7 @@ def test_base_executor_cannot_send_callback():
def test_parser_and_formatter_class():
- executor = BaseExecutor()
+ executor = BaseExecutor(42)
parser = executor._get_parser()
assert isinstance(parser, DefaultHelpParser)
assert parser.formatter_class is AirflowHelpFormatter
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py
b/airflow-core/tests/unit/executors/test_local_executor.py
index ec72863b20e..f24104f3b41 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -123,10 +123,7 @@ class TestLocalExecutor:
@skip_spawn_mp_start
@pytest.mark.parametrize(
("parallelism",),
- [
- pytest.param(0, id="unlimited"),
- pytest.param(2, id="limited"),
- ],
+ [pytest.param(2, id="limited")],
)
def test_execution(self, parallelism: int):
self._test_execute(parallelism=parallelism)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 026c755608c..8c9d5cd6bfe 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -21,7 +21,6 @@ import contextlib
import datetime
import logging
import os
-import sys
from collections import Counter, deque
from collections.abc import Generator
from datetime import timedelta
@@ -1922,123 +1921,6 @@ class TestSchedulerJob:
assert total_enqueued == 31
session.rollback()
- @pytest.mark.parametrize(
- "task1_exec, task2_exec",
- [
- ("default_exec", "default_exec"),
- ("default_exec", "secondary_exec"),
- ("secondary_exec", "secondary_exec"),
- ],
- )
- def test_execute_task_instances_unlimited_parallelism_multiple_executors(
- self, task1_exec, task2_exec, dag_maker, mock_executors
- ):
- """Test core.parallelism leads to unlimited scheduling, but queries
limited by max_tis"""
- dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
- task_id_1 = "dummy_task"
- task_id_2 = "dummy_task_2"
- session = settings.Session()
-
- with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
- task1 = EmptyOperator(task_id=task_id_1, executor=task1_exec)
- task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
-
- scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job)
-
- def _create_dagruns():
- dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.RUNNING)
- yield dagrun
- for _ in range(39):
- dagrun = dag_maker.create_dagrun_after(
- dagrun,
- run_type=DagRunType.SCHEDULED,
- state=State.RUNNING,
- )
- yield dagrun
-
- for dr in _create_dagruns():
- ti1 = dr.get_task_instance(task1.task_id, session)
- ti2 = dr.get_task_instance(task2.task_id, session)
- ti1.state = State.SCHEDULED
- ti2.state = State.SCHEDULED
- session.flush()
-
- scheduler_job.max_tis_per_query = 50
- for executor in mock_executors:
- executor.parallelism = 0
- executor.slots_occupied = 0
- executor.slots_available = sys.maxsize
-
- with conf_vars({("core", "parallelism"): "0"}):
- # 40 dag runs * 2 tasks each = 80.
- enqueued =
self.job_runner._critical_section_enqueue_task_instances(session)
- # Parallelism is unlimited, but we still only query for
max_tis_per_query each time we enqueue
- assert enqueued == 50
-
- enqueued =
self.job_runner._critical_section_enqueue_task_instances(session)
- # The remaining 30 are enqueued the next loop
- assert enqueued == 30
-
- session.rollback()
-
- @pytest.mark.parametrize(
- "task1_exec, task2_exec",
- [
- ("default_exec", "default_exec"),
- ("default_exec", "secondary_exec"),
- ("secondary_exec", "secondary_exec"),
- ],
- )
- def
test_execute_task_instances_unlimited_parallelism_unlimited_max_tis_multiple_executors(
- self, task1_exec, task2_exec, dag_maker, mock_executors
- ):
- """Test core.parallelism leads to unlimited scheduling"""
- dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
- task_id_1 = "dummy_task"
- task_id_2 = "dummy_task_2"
- session = settings.Session()
-
- with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
- task1 = EmptyOperator(task_id=task_id_1, executor=task1_exec)
- task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec)
-
- scheduler_job = Job()
- self.job_runner = SchedulerJobRunner(job=scheduler_job)
-
- def _create_dagruns():
- dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED,
state=State.RUNNING)
- yield dagrun
- for _ in range(39):
- dagrun = dag_maker.create_dagrun_after(
- dagrun,
- run_type=DagRunType.SCHEDULED,
- state=State.RUNNING,
- )
- yield dagrun
-
- for dr in _create_dagruns():
- ti1 = dr.get_task_instance(task1.task_id, session)
- ti2 = dr.get_task_instance(task2.task_id, session)
- ti1.state = State.SCHEDULED
- ti2.state = State.SCHEDULED
- session.flush()
-
- scheduler_job.max_tis_per_query = 0
- for executor in mock_executors:
- executor.parallelism = 0
- executor.slots_occupied = 0
- executor.slots_available = sys.maxsize
-
- with conf_vars({("core", "parallelism"): "0"}):
- # 40 dag runs * 2 tasks each = 80. With core.parallelism set to
zero then executors have
- # unlimited slots and with max_tis_per_query set to zero, query
will match also allow infinity.
- # Thus, all tasks should be enqueued in one step
- enqueued =
self.job_runner._critical_section_enqueue_task_instances(session)
-
- assert enqueued == 80
- session.rollback()
-
def test_adopt_or_reset_orphaned_tasks(self, dag_maker, session):
with dag_maker("test_execute_helper_reset_orphaned_tasks",
session=session):
op1 = EmptyOperator(task_id="op1")