This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 92290dbd64f Restore mp_start_method config and add 
mp_forkserver_preload config (#68875)
92290dbd64f is described below

commit 92290dbd64f52205b516668db29fd437435b7dbe
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue Jun 23 22:12:23 2026 +0900

    Restore mp_start_method config and add mp_forkserver_preload config (#68875)
    
    * Restore mp_start_method config and add mp_forkserver_preload config
    
    * Remove set_component_mp_start_method for API server
    
    * Spec multiprocessing mocks and cover start-method failure path
---
 .../airflow/cli/commands/dag_processor_command.py  |  2 +
 .../src/airflow/cli/commands/scheduler_command.py  |  2 +
 .../src/airflow/cli/commands/triggerer_command.py  |  2 +
 .../src/airflow/config_templates/config.yml        | 31 ++++++++
 .../src/airflow/executors/local_executor.py        |  6 +-
 airflow-core/src/airflow/utils/process_utils.py    | 65 ++++++++++++++++
 .../tests/unit/executors/test_local_executor.py    | 17 +++-
 .../tests/unit/utils/test_process_utils.py         | 90 ++++++++++++++++++++++
 8 files changed, 212 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/cli/commands/dag_processor_command.py 
b/airflow-core/src/airflow/cli/commands/dag_processor_command.py
index f4c303c278d..04e410a5d66 100644
--- a/airflow-core/src/airflow/cli/commands/dag_processor_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_processor_command.py
@@ -27,6 +27,7 @@ from airflow.jobs.dag_processor_job_runner import 
DagProcessorJobRunner
 from airflow.jobs.job import Job, run_job
 from airflow.utils import cli as cli_utils
 from airflow.utils.memray_utils import MemrayTraceComponents, 
enable_memray_trace
+from airflow.utils.process_utils import set_component_mp_start_method
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 log = logging.getLogger(__name__)
@@ -50,6 +51,7 @@ def _create_dag_processor_job_runner(args: Any) -> 
DagProcessorJobRunner:
 @providers_configuration_loaded
 def dag_processor(args):
     """Start Airflow Dag Processor Job."""
+    set_component_mp_start_method("dag_processor")
     job_runner = _create_dag_processor_job_runner(args)
 
     if cli_utils.should_enable_hot_reload(args):
diff --git a/airflow-core/src/airflow/cli/commands/scheduler_command.py 
b/airflow-core/src/airflow/cli/commands/scheduler_command.py
index b79bc25fb01..e40592d4e3c 100644
--- a/airflow-core/src/airflow/cli/commands/scheduler_command.py
+++ b/airflow-core/src/airflow/cli/commands/scheduler_command.py
@@ -30,6 +30,7 @@ from airflow.jobs.job import Job, run_job
 from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
 from airflow.utils import cli as cli_utils
 from airflow.utils.memray_utils import MemrayTraceComponents, 
enable_memray_trace
+from airflow.utils.process_utils import set_component_mp_start_method
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.scheduler_health import serve_health_check
 
@@ -38,6 +39,7 @@ log = logging.getLogger(__name__)
 
 @enable_memray_trace(component=MemrayTraceComponents.scheduler)
 def _run_scheduler_job(args) -> None:
+    set_component_mp_start_method("scheduler")
     job_runner = SchedulerJobRunner(
         job=Job(),
         num_runs=args.num_runs,
diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py 
b/airflow-core/src/airflow/cli/commands/triggerer_command.py
index 1182fbb05b2..7e1d98f44db 100644
--- a/airflow-core/src/airflow/cli/commands/triggerer_command.py
+++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py
@@ -30,6 +30,7 @@ from airflow.jobs.job import Job, run_job
 from airflow.jobs.triggerer_job_runner import TriggererJobRunner
 from airflow.utils import cli as cli_utils
 from airflow.utils.memray_utils import MemrayTraceComponents, 
enable_memray_trace
+from airflow.utils.process_utils import set_component_mp_start_method
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 
@@ -58,6 +59,7 @@ def triggerer_run(
     queues: set[str] | None = None,
     team_name: str | None = None,
 ):
+    set_component_mp_start_method("triggerer")
     with _serve_logs(skip_serve_logs):
         triggerer_job_runner = TriggererJobRunner(
             job=Job(heartrate=triggerer_heartrate), capacity=capacity, 
queues=queues, team_name=team_name
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index d8bbd8442b6..a8ce5506c53 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -76,6 +76,37 @@ core:
       type: string
       example: ~
       default: "LocalExecutor"
+    mp_start_method:
+      description: |
+        The ``multiprocessing`` start method used by long-lived components 
that spawn worker
+        processes (the scheduler's ``LocalExecutor`` and the triggerer). Must 
be
+        one of the values returned by `multiprocessing.get_all_start_methods()
+        
<https://docs.python.org/3/library/multiprocessing.html#multiprocessing.get_all_start_methods>`__
+        on your platform (typically ``fork``, ``forkserver`` or ``spawn``). 
When unset (the
+        default) the platform default is used.
+
+        Python 3.14 changed the Unix default from ``fork`` to ``forkserver``. 
``forkserver`` and
+        ``spawn`` children re-import Airflow instead of sharing it 
copy-on-write, which can sharply
+        increase the resident memory of the scheduler and triggerer. Set this 
to ``fork`` to restore
+        the pre-3.14 behaviour (note: forking a multi-threaded process is not 
always safe), or keep
+        ``forkserver`` and set ``[core] mp_forkserver_preload`` to recover 
most of the sharing.
+
+        This can be overridden per component by setting ``mp_start_method`` in 
the ``[scheduler]``,
+        ``[triggerer]`` or ``[dag_processor]`` section.
+      version_added: 3.3.0
+      type: string
+      example: "fork"
+      default: ~
+    mp_forkserver_preload:
+      description: |
+        Comma-separated list of modules the ``forkserver`` process should 
import up front, so that
+        forked workers inherit them copy-on-write instead of re-importing 
them. Only used when the
+        effective ``mp_start_method`` is ``forkserver``. See 
`multiprocessing.set_forkserver_preload
+        
<https://docs.python.org/3/library/multiprocessing.html#multiprocessing.set_forkserver_preload>`__.
+      version_added: 3.3.0
+      type: string
+      example: "airflow,airflow.executors.local_executor"
+      default: ~
     auth_manager:
       description: |
         The auth manager class that airflow should use. Full import path to 
the auth manager class.
diff --git a/airflow-core/src/airflow/executors/local_executor.py 
b/airflow-core/src/airflow/executors/local_executor.py
index dd064253f8a..8917d72bbbe 100644
--- a/airflow-core/src/airflow/executors/local_executor.py
+++ b/airflow-core/src/airflow/executors/local_executor.py
@@ -133,7 +133,7 @@ class LocalExecutor(BaseExecutor):
     """
 
     is_local: bool = True
-    is_mp_using_fork: bool = multiprocessing.get_start_method() == "fork"
+    is_mp_using_fork: bool
 
     supports_multi_team: bool = True
     serve_logs: bool = True
@@ -148,6 +148,10 @@ class LocalExecutor(BaseExecutor):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
 
+        # Resolve the start method at instantiation, not at import: the 
component CLI entry may have
+        # set it via [<component>]/[core] mp_start_method before the executor 
is created.
+        self.is_mp_using_fork = multiprocessing.get_start_method() == "fork"
+
         # Check if self has the ExecutorConf set on the self.conf attribute, 
and if not, set it to the global
         # configuration object. This allows the changes to be backwards 
compatible with older versions of
         # Airflow.
diff --git a/airflow-core/src/airflow/utils/process_utils.py 
b/airflow-core/src/airflow/utils/process_utils.py
index af887f1b8aa..cfadcc289a0 100644
--- a/airflow-core/src/airflow/utils/process_utils.py
+++ b/airflow-core/src/airflow/utils/process_utils.py
@@ -22,6 +22,7 @@ from __future__ import annotations
 import array
 import errno
 import logging
+import multiprocessing
 import os
 import select
 import shlex
@@ -388,3 +389,67 @@ def set_new_process_group() -> None:
         return
 
     os.setpgid(0, 0)
+
+
+_MP_START_METHOD_OPTION = "mp_start_method"
+_MP_FORKSERVER_PRELOAD_OPTION = "mp_forkserver_preload"
+
+
+def resolve_mp_start_method(component: str) -> str | None:
+    """
+    Return the configured ``multiprocessing`` start method for *component*, or 
``None``.
+
+    Resolution order: ``[<component>] mp_start_method`` then ``[core] 
mp_start_method`` then
+    ``None`` (keep the platform default).
+    """
+    method = conf.get(component, _MP_START_METHOD_OPTION, fallback=None) or 
conf.get(
+        "core", _MP_START_METHOD_OPTION, fallback=None
+    )
+    return method.strip() if method and method.strip() else None
+
+
+def set_component_mp_start_method(component: str) -> None:
+    """
+    Apply the configured ``multiprocessing`` start method for *component*, if 
any.
+
+    Python 3.14 changed the Unix (non-macOS) default from ``fork`` to 
``forkserver``;
+    ``forkserver``/``spawn`` children re-import Airflow instead of sharing it 
copy-on-write, which
+    inflates the memory of components that spawn workers (the scheduler's 
``LocalExecutor`` and the
+    triggerer). This restores control over that choice.
+
+    Must be called before the component creates any ``multiprocessing`` object 
(e.g. before the
+    executor or job runner starts). A no-op when unset or when the method is 
unavailable on the
+    current platform, so it is always safe to call.
+    """
+    method = resolve_mp_start_method(component)
+    if not method:
+        return
+
+    available = multiprocessing.get_all_start_methods()
+    if method not in available:
+        log.warning(
+            "Configured %s=%r for %s is not available on this platform 
(available: %s); "
+            "leaving the platform default start method in place.",
+            _MP_START_METHOD_OPTION,
+            method,
+            component,
+            available,
+        )
+        return
+
+    try:
+        multiprocessing.set_start_method(method, force=True)
+    except (RuntimeError, ValueError) as exc:
+        log.warning("Could not set multiprocessing start method to %r for %s: 
%s", method, component, exc)
+        return
+    log.info("Set multiprocessing start method to %r for %s", method, 
component)
+
+    if method == "forkserver":
+        preload_raw = conf.get(component, _MP_FORKSERVER_PRELOAD_OPTION, 
fallback=None) or conf.get(
+            "core", _MP_FORKSERVER_PRELOAD_OPTION, fallback=None
+        )
+        modules = [m.strip() for m in preload_raw.split(",")] if preload_raw 
else []
+        modules = [m for m in modules if m]
+        if modules:
+            multiprocessing.set_forkserver_preload(modules)
+            log.info("Set forkserver preload modules for %s: %s", component, 
modules)
diff --git a/airflow-core/tests/unit/executors/test_local_executor.py 
b/airflow-core/tests/unit/executors/test_local_executor.py
index eae6ed0458c..bdc06b22221 100644
--- a/airflow-core/tests/unit/executors/test_local_executor.py
+++ b/airflow-core/tests/unit/executors/test_local_executor.py
@@ -50,6 +50,19 @@ skip_non_fork_mp_start = pytest.mark.skipif(
     reason="mock patching in test doesn't work with non-fork multiprocessing 
start methods",
 )
 
+
+class TestLocalExecutorMpStartMethod:
+    
@mock.patch("airflow.executors.local_executor.multiprocessing.get_start_method",
 autospec=True)
+    def test_is_mp_using_fork_resolved_per_instance(self, 
mock_get_start_method):
+        """``is_mp_using_fork`` is resolved at ``__init__`` (reflecting any 
configured start
+        method) rather than once at import time."""
+        mock_get_start_method.return_value = "fork"
+        assert LocalExecutor(parallelism=1).is_mp_using_fork is True
+
+        mock_get_start_method.return_value = "forkserver"
+        assert LocalExecutor(parallelism=1).is_mp_using_fork is False
+
+
 skip_fork_mp_start = pytest.mark.skipif(
     multiprocessing.get_start_method() == "fork",
     reason="tests non-fork (lazy-spawning) behavior",
@@ -360,7 +373,7 @@ class TestLocalExecutor:
             # Verify each executor has its own workers dict
             assert team_a_executor.workers is not team_b_executor.workers
 
-            if LocalExecutor.is_mp_using_fork:
+            if team_a_executor.is_mp_using_fork:
                 # fork pre-spawns all workers at start()
                 assert len(team_a_executor.workers) == 2
                 assert len(team_b_executor.workers) == 3
@@ -390,7 +403,7 @@ class TestLocalExecutor:
 
         executor.start()
 
-        if LocalExecutor.is_mp_using_fork:
+        if executor.is_mp_using_fork:
             assert len(executor.workers) == 2
         else:
             # forkserver/spawn use lazy spawning
diff --git a/airflow-core/tests/unit/utils/test_process_utils.py 
b/airflow-core/tests/unit/utils/test_process_utils.py
index 5ebc599b6b6..988d9e14dff 100644
--- a/airflow-core/tests/unit/utils/test_process_utils.py
+++ b/airflow-core/tests/unit/utils/test_process_utils.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import logging
+import multiprocessing
 import os
 import signal
 import subprocess
@@ -39,6 +40,8 @@ from airflow.utils.process_utils import (
     set_new_process_group,
 )
 
+from tests_common.test_utils.config import conf_vars
+
 
 class TestReapProcessGroup:
     # Inline script that creates a new session, spawns a SIGTERM-ignoring 
child,
@@ -226,3 +229,90 @@ class TestSetNewProcessGroup:
             mock_get_sid.return_value = pid
             set_new_process_group()
             assert mock_set_pid.call_count == 0
+
+
+ALL_MP_METHODS = ["fork", "spawn", "forkserver"]
+
+
+class TestResolveMpStartMethod:
+    def test_unset_returns_none(self):
+        assert process_utils.resolve_mp_start_method("scheduler") is None
+
+    @conf_vars({("core", "mp_start_method"): "forkserver"})
+    def test_core_value_used(self):
+        assert process_utils.resolve_mp_start_method("scheduler") == 
"forkserver"
+        assert process_utils.resolve_mp_start_method("triggerer") == 
"forkserver"
+
+    @conf_vars({("core", "mp_start_method"): "forkserver", ("scheduler", 
"mp_start_method"): "fork"})
+    def test_component_overrides_core(self):
+        assert process_utils.resolve_mp_start_method("scheduler") == "fork"
+        assert process_utils.resolve_mp_start_method("triggerer") == 
"forkserver"
+
+    @conf_vars({("core", "mp_start_method"): "   "})
+    def test_blank_returns_none(self):
+        assert process_utils.resolve_mp_start_method("scheduler") is None
+
+
+class TestSetComponentMpStartMethod:
+    @mock.patch("airflow.utils.process_utils.multiprocessing", 
spec=multiprocessing)
+    def test_noop_when_unset(self, mock_mp):
+        process_utils.set_component_mp_start_method("scheduler")
+        mock_mp.set_start_method.assert_not_called()
+
+    @conf_vars({("core", "mp_start_method"): "fork"})
+    @mock.patch("airflow.utils.process_utils.multiprocessing", 
spec=multiprocessing)
+    def test_sets_method_from_core(self, mock_mp):
+        mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+        process_utils.set_component_mp_start_method("scheduler")
+        mock_mp.set_start_method.assert_called_once_with("fork", force=True)
+        mock_mp.set_forkserver_preload.assert_not_called()
+
+    @conf_vars({("core", "mp_start_method"): "spawn", ("triggerer", 
"mp_start_method"): "fork"})
+    @mock.patch("airflow.utils.process_utils.multiprocessing", 
spec=multiprocessing)
+    def test_component_override_applied(self, mock_mp):
+        mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+        process_utils.set_component_mp_start_method("triggerer")
+        mock_mp.set_start_method.assert_called_once_with("fork", force=True)
+
+    @conf_vars({("core", "mp_start_method"): "fork"})
+    @mock.patch("airflow.utils.process_utils.multiprocessing", 
spec=multiprocessing)
+    def test_noop_when_method_unavailable(self, mock_mp):
+        mock_mp.get_all_start_methods.return_value = ["spawn"]
+        process_utils.set_component_mp_start_method("scheduler")
+        mock_mp.set_start_method.assert_not_called()
+
+    @conf_vars(
+        {
+            ("core", "mp_start_method"): "forkserver",
+            ("core", "mp_forkserver_preload"): "airflow, 
airflow.executors.local_executor ,",
+        }
+    )
+    @mock.patch("airflow.utils.process_utils.multiprocessing", 
spec=multiprocessing)
+    def test_forkserver_sets_preload(self, mock_mp):
+        mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+        process_utils.set_component_mp_start_method("triggerer")
+        mock_mp.set_start_method.assert_called_once_with("forkserver", 
force=True)
+        mock_mp.set_forkserver_preload.assert_called_once_with(
+            ["airflow", "airflow.executors.local_executor"]
+        )
+
+    @conf_vars({("core", "mp_start_method"): "fork", ("core", 
"mp_forkserver_preload"): "airflow"})
+    @mock.patch("airflow.utils.process_utils.multiprocessing", 
spec=multiprocessing)
+    def test_preload_ignored_for_non_forkserver(self, mock_mp):
+        mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+        process_utils.set_component_mp_start_method("scheduler")
+        mock_mp.set_forkserver_preload.assert_not_called()
+
+    @conf_vars(
+        {
+            ("core", "mp_start_method"): "forkserver",
+            ("core", "mp_forkserver_preload"): "airflow",
+        }
+    )
+    @pytest.mark.parametrize("exc", [RuntimeError("already set"), 
ValueError("bad method")])
+    @mock.patch("airflow.utils.process_utils.multiprocessing", 
spec=multiprocessing)
+    def test_set_start_method_failure_is_swallowed(self, mock_mp, exc):
+        mock_mp.get_all_start_methods.return_value = ALL_MP_METHODS
+        mock_mp.set_start_method.side_effect = exc
+        process_utils.set_component_mp_start_method("scheduler")
+        mock_mp.set_forkserver_preload.assert_not_called()

Reply via email to