This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 92290dbd64f Restore mp_start_method config and add
mp_forkserver_preload config (#68875)
92290dbd64f is described below
commit 92290dbd64f52205b516668db29fd437435b7dbe
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue Jun 23 22:12:23 2026 +0900
Restore mp_start_method config and add mp_forkserver_preload config (#68875)
* Restore mp_start_method config and add mp_forkserver_preload config
* Remove set_component_mp_start_method for API server
* Spec multiprocessing mocks and cover start-method failure path
---
.../airflow/cli/commands/dag_processor_command.py | 2 +
.../src/airflow/cli/commands/scheduler_command.py | 2 +
.../src/airflow/cli/commands/triggerer_command.py | 2 +
.../src/airflow/config_templates/config.yml | 31 ++++++++
.../src/airflow/executors/local_executor.py | 6 +-
airflow-core/src/airflow/utils/process_utils.py | 65 ++++++++++++++++
.../tests/unit/executors/test_local_executor.py | 17 +++-
.../tests/unit/utils/test_process_utils.py | 90 ++++++++++++++++++++++
8 files changed, 212 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/cli/commands/dag_processor_command.py
b/airflow-core/src/airflow/cli/commands/dag_processor_command.py
index f4c303c278d..04e410a5d66 100644
--- a/airflow-core/src/airflow/cli/commands/dag_processor_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_processor_command.py
@@ -27,6 +27,7 @@ from airflow.jobs.dag_processor_job_runner import
DagProcessorJobRunner
from airflow.jobs.job import Job, run_job
from airflow.utils import cli as cli_utils
from airflow.utils.memray_utils import MemrayTraceComponents,
enable_memray_trace
+from airflow.utils.process_utils import set_component_mp_start_method
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
log = logging.getLogger(__name__)
@@ -50,6 +51,7 @@ def _create_dag_processor_job_runner(args: Any) ->
DagProcessorJobRunner:
@providers_configuration_loaded
def dag_processor(args):
"""Start Airflow Dag Processor Job."""
+ set_component_mp_start_method("dag_processor")
job_runner = _create_dag_processor_job_runner(args)
if cli_utils.should_enable_hot_reload(args):
diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py
b/airflow-core/src/airflow/cli/commands/scheduler_command.py
index b79bc25fb01..e40592d4e3c 100644
--- a/airflow-core/src/airflow/cli/commands/scheduler_command.py
+++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py
@@ -30,6 +30,7 @@ from airflow.jobs.job import Job, run_job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.utils import cli as cli_utils
from airflow.utils.memray_utils import MemrayTraceComponents,
enable_memray_trace
+from airflow.utils.process_utils import set_component_mp_start_method
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.scheduler_health import serve_health_check
@@ -38,6 +39,7 @@ log = logging.getLogger(__name__)
@enable_memray_trace(component=MemrayTraceComponents.scheduler)
def _run_scheduler_job(args) -> None:
+ set_component_mp_start_method("scheduler")
job_runner = SchedulerJobRunner(
job=Job(),
num_runs=args.num_runs,
diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py
b/airflow-core/src/airflow/cli/commands/triggerer_command.py
index 1182fbb05b2..7e1d98f44db 100644
--- a/airflow-core/src/airflow/cli/commands/triggerer_command.py
+++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py
@@ -30,6 +30,7 @@ from airflow.jobs.job import Job, run_job
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.utils import cli as cli_utils
from airflow.utils.memray_utils import MemrayTraceComponents,
enable_memray_trace
+from airflow.utils.process_utils import set_component_mp_start_method
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
@@ -58,6 +59,7 @@ def triggerer_run(
queues: set[str] | None = None,
team_name: str | None = None,
):
+ set_component_mp_start_method("triggerer")
with _serve_logs(skip_serve_logs):
triggerer_job_runner = TriggererJobRunner(
job=Job(heartrate=triggerer_heartrate), capacity=capacity,
queues=queues, team_name=team_name
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index d8bbd8442b6..a8ce5506c53 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -76,6 +76,37 @@ core:
type: string
example: ~
default: "LocalExecutor"
+ mp_start_method:
+ description: |
+ The ``multiprocessing`` start method used by long-lived components
that spawn worker
+ processes (the scheduler's ``LocalExecutor`` and the triggerer). Must
be
+ one of the values returned by `multiprocessing.get_all_start_methods()
+
<https://docs.python.org/3/library/multiprocessing.html#multiprocessing.get_all_start_methods>`__
+ on your platform (typically ``fork``, ``forkserver`` or ``spawn``).
When unset (the
+ default) the platform default is used.
+
+ Python 3.14 changed the Unix default from ``fork`` to ``forkserver``.
``forkserver`` and
+ ``spawn`` children re-import Airflow instead of sharing it
copy-on-write, which can sharply
+ increase the resident memory of the scheduler and triggerer. Set this
to ``fork`` to restore
+ the pre-3.14 behaviour (note: forking a multi-threaded process is not
always safe), or keep
+ ``forkserver`` and set ``[core] mp_forkserver_preload`` to recover
most of the sharing.
+
+ This can be overridden per component by setting ``mp_start_method`` in
the ``[scheduler]``,
+ ``[triggerer]`` or ``[dag_processor]`` section.
+ version_added: 3.3.0
+ type: string
+ example: "fork"
+ default: ~
+ mp_forkserver_preload:
+ description: |
+ Comma-separated list of modules the ``forkserver`` process should
import up front, so that
+ forked workers inherit them copy-on-write instead of re-importing
them. Only used when the
+ effective ``mp_start_method`` is ``forkserver``. See
`multiprocessing.set_forkserver_preload
+
<https://docs.python.org/3/library/multiprocessing.html#multiprocessing.set_forkserver_preload>`__.
+ version_added: 3.3.0
+ type: string
+ example: "airflow,airflow.executors.local_executor"
+ default: ~
auth_manager:
description: |
The auth manager class that airflow should use. Full import path to
the auth manager class.
diff --git a/airflow-core/src/airflow/executors/local_executor.py
b/airflow-core/src/airflow/executors/local_executor.py
index dd064253f8a..8917d72bbbe 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -133,7 +133,7 @@ class LocalExecutor(BaseExecutor):
"""
is_local: bool = True
- is_mp_using_fork: bool = multiprocessing.get_start_method() == "fork"
+ is_mp_using_fork: bool
supports_multi_team: bool = True
serve_logs: bool = True
@@ -148,6 +148,10 @@ class LocalExecutor(BaseExecutor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
+ # Resolve the start method at instantiation, not at import: the
component CLI entry may have
+ # set it via [<component>]/[core] mp_start_method before the executor
is created.
+ self.is_mp_using_fork = multiprocessing.get_start_method() == "fork"
+
# Check if self has the ExecutorConf set on the self.conf attribute,
and if not, set it to the global
# configuration object. This allows the changes to be backwards
compatible with older versions of
# Airflow.
diff --git a/airflow-core/src/airflow/utils/process_utils.py
b/airflow-core/src/airflow/utils/process_utils.py
index af887f1b8aa..cfadcc289a0 100644
--- a/airflow-core/src/airflow/utils/process_utils.py
+++ b/airflow-core/src/airflow/utils/process_utils.py
@@ -22,6 +22,7 @@ from __future__ import annotations
import array
import errno
import logging
+import multiprocessing
import os
import select
import shlex
@@ -388,3 +389,67 @@ def set_new_process_group() -> None:
return
os.setpgid(0, 0)
+
+
+_MP_START_METHOD_OPTION = "mp_start_method"
+_MP_FORKSERVER_PRELOAD_OPTION = "mp_forkserver_preload"
+
+
+def resolve_mp_start_method(component: str) -> str | None:
+ """
+ Return the configured ``multiprocessing`` start method for *component*, or
``None``.
+
+ Resolution order: ``[<component>] mp_start_method`` then ``[core]
mp_start_method`` then
+ ``None`` (keep the platform default).
+ """
+ method = conf.get(component, _MP_START_METHOD_OPTION, fallback=None) or
conf.get(
+ "core", _MP_START_METHOD_OPTION, fallback=None
+ )
+ return method.strip() if method and method.strip() else None
+
+
+def set_component_mp_start_method(component: str) -> None:
+ """
+ Apply the configured ``multiprocessing`` start method for *component*, if
any.
+
+ Python 3.14 changed the Unix (non-macOS) default from ``fork`` to
``forkserver``;
+ ``forkserver``/``spawn`` children re-import Airflow instead of sharing it
copy-on-write, which
+ inflates the memory of components that spawn workers (the scheduler's
``LocalExecutor`` and the
+ triggerer). This restores control over that choice.
+
+ Must be called before the component creates any ``multiprocessing`` object
(e.g. before the
+ executor or job runner starts). A no-op when unset or when the method is
unavailable on the
+ current platform, so it is always safe to call.
+ """
+ method = resolve_mp_start_method(component)
+ if not method:
+ return
+
+ available = multiprocessing.get_all_start_methods()
+ if method not in available:
+ log.warning(
+ "Configured %s=%r for %s is not available on this platform
(available: %s); "
+ "leaving the platform default start method in place.",
+ _MP_START_METHOD_OPTION,
+ method,
+ component,
+ available,
+ )
+ return
+
+ try:
+ multiprocessing.set_start_method(method, force=True)
+ except (RuntimeError, ValueError) as exc:
+ log.warning("Could not set multiprocessing start method to %r for %s:
%s", method, component, exc)
+ return
+ log.info("Set multiprocessing start method to %r for %s", method,
component)
+
+ if method == "forkserver":
+ preload_raw = conf.get(component, _MP_FORKSERVER_PRELOAD_OPTION,
fallback=None) or conf.get(
+ "core", _MP_FORKSERVER_PRELOAD_OPTION, fallback=None
+ )
+ modules = [m.strip() for m in preload_raw.split(",")] if preload_raw
else []
+ modules = [m for m in modules if m]
+ if modules:
+ multiprocessing.set_forkserver_preload(modules)
+ log.info("Set forkserver preload modules for %s: %s", component,
modules)
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py
b/airflow-core/tests/unit/executors/test_local_executor.py
index eae6ed0458c..bdc06b22221 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -50,6 +50,19 @@ skip_non_fork_mp_start = pytest.mark.skipif(
reason="mock patching in test doesn't work with non-fork multiprocessing
start methods",
)
+
+class TestLocalExecutorMpStartMethod:
+
@mock.patch("airflow.executors.local_executor.multiprocessing.get_start_method",
autospec=True)
+ def test_is_mp_using_fork_resolved_per_instance(self,
mock_get_start_method):
+ """``is_mp_using_fork`` is resolved at ``__init__`` (reflecting any
configured start
+ method) rather than once at import time."""
+ mock_get_start_method.return_value = "fork"
+ assert LocalExecutor(parallelism=1).is_mp_using_fork is True
+
+ mock_get_start_method.return_value = "forkserver"
+ assert LocalExecutor(parallelism=1).is_mp_using_fork is False
+
+
skip_fork_mp_start = pytest.mark.skipif(
multiprocessing.get_start_method() == "fork",
reason="tests non-fork (lazy-spawning) behavior",
@@ -360,7 +373,7 @@ class TestLocalExecutor:
# Verify each executor has its own workers dict
assert team_a_executor.workers is not team_b_executor.workers
- if LocalExecutor.is_mp_using_fork:
+ if team_a_executor.is_mp_using_fork:
# fork pre-spawns all workers at start()
assert len(team_a_executor.workers) == 2
assert len(team_b_executor.workers) == 3
@@ -390,7 +403,7 @@ class TestLocalExecutor:
executor.start()
- if LocalExecutor.is_mp_using_fork:
+ if executor.is_mp_using_fork:
assert len(executor.workers) == 2
else:
# forkserver/spawn use lazy spawning
diff --git a/airflow-core/tests/unit/utils/test_process_utils.py
b/airflow-core/tests/unit/utils/test_process_utils.py
index 5ebc599b6b6..988d9e14dff 100644
--- a/airflow-core/tests/unit/utils/test_process_utils.py
+++ b/airflow-core/tests/unit/utils/test_process_utils.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import logging
+import multiprocessing
import os
import signal
import subprocess
@@ -39,6 +40,8 @@ from airflow.utils.process_utils import (
set_new_process_group,
)
+from tests_common.test_utils.config import conf_vars
+
class TestReapProcessGroup:
# Inline script that creates a new session, spawns a SIGTERM-ignoring
child,
@@ -226,3 +229,90 @@ class TestSetNewProcessGroup:
mock_get_sid.return_value = pid
set_new_process_group()
assert mock_set_pid.call_count == 0
+
+
+ALL_MP_METHODS = ["fork", "spawn", "forkserver"]
+
+
+class TestResolveMpStartMethod:
+ def test_unset_returns_none(self):
+ assert process_utils.resolve_mp_start_method("scheduler") is None
+
+ @conf_vars({("core", "mp_start_method"): "forkserver"})
+ def test_core_value_used(self):
+ assert process_utils.resolve_mp_start_method("scheduler") ==
"forkserver"
+ assert process_utils.resolve_mp_start_method("triggerer") ==
"forkserver"
+
+ @conf_vars({("core", "mp_start_method"): "forkserver", ("scheduler",
"mp_start_method"): "fork"})
+ def test_component_overrides_core(self):
+ assert process_utils.resolve_mp_start_method("scheduler") == "fork"
+ assert process_utils.resolve_mp_start_method("triggerer") ==
"forkserver"
+
+ @conf_vars({("core", "mp_start_method"): " "})
+ def test_blank_returns_none(self):
+ assert process_utils.resolve_mp_start_method("scheduler") is None
+
+
+class TestSetComponentMpStartMethod:
+ @mock.patch("airflow.utils.process_utils.multiprocessing",
spec=multiprocessing)
+ def test_noop_when_unset(self, mock_mp):
+ process_utils.set_component_mp_start_method("scheduler")
+ mock_mp.set_start_method.assert_not_called()
+
+ @conf_vars({("core", "mp_start_method"): "fork"})
+ @mock.patch("airflow.utils.process_utils.multiprocessing",
spec=multiprocessing)
+ def test_sets_method_from_core(self, mock_mp):
+ mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+ process_utils.set_component_mp_start_method("scheduler")
+ mock_mp.set_start_method.assert_called_once_with("fork", force=True)
+ mock_mp.set_forkserver_preload.assert_not_called()
+
+ @conf_vars({("core", "mp_start_method"): "spawn", ("triggerer",
"mp_start_method"): "fork"})
+ @mock.patch("airflow.utils.process_utils.multiprocessing",
spec=multiprocessing)
+ def test_component_override_applied(self, mock_mp):
+ mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+ process_utils.set_component_mp_start_method("triggerer")
+ mock_mp.set_start_method.assert_called_once_with("fork", force=True)
+
+ @conf_vars({("core", "mp_start_method"): "fork"})
+ @mock.patch("airflow.utils.process_utils.multiprocessing",
spec=multiprocessing)
+ def test_noop_when_method_unavailable(self, mock_mp):
+ mock_mp.get_all_start_methods.return_value = ["spawn"]
+ process_utils.set_component_mp_start_method("scheduler")
+ mock_mp.set_start_method.assert_not_called()
+
+ @conf_vars(
+ {
+ ("core", "mp_start_method"): "forkserver",
+ ("core", "mp_forkserver_preload"): "airflow,
airflow.executors.local_executor ,",
+ }
+ )
+ @mock.patch("airflow.utils.process_utils.multiprocessing",
spec=multiprocessing)
+ def test_forkserver_sets_preload(self, mock_mp):
+ mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+ process_utils.set_component_mp_start_method("triggerer")
+ mock_mp.set_start_method.assert_called_once_with("forkserver",
force=True)
+ mock_mp.set_forkserver_preload.assert_called_once_with(
+ ["airflow", "airflow.executors.local_executor"]
+ )
+
+ @conf_vars({("core", "mp_start_method"): "fork", ("core",
"mp_forkserver_preload"): "airflow"})
+ @mock.patch("airflow.utils.process_utils.multiprocessing",
spec=multiprocessing)
+ def test_preload_ignored_for_non_forkserver(self, mock_mp):
+ mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+ process_utils.set_component_mp_start_method("scheduler")
+ mock_mp.set_forkserver_preload.assert_not_called()
+
+ @conf_vars(
+ {
+ ("core", "mp_start_method"): "forkserver",
+ ("core", "mp_forkserver_preload"): "airflow",
+ }
+ )
+ @pytest.mark.parametrize("exc", [RuntimeError("already set"),
ValueError("bad method")])
+ @mock.patch("airflow.utils.process_utils.multiprocessing",
spec=multiprocessing)
+ def test_set_start_method_failure_is_swallowed(self, mock_mp, exc):
+ mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+ mock_mp.set_start_method.side_effect = exc
+ process_utils.set_component_mp_start_method("scheduler")
+ mock_mp.set_forkserver_preload.assert_not_called()