https://github.com/python/cpython/commit/efadc5874cdecc0420926afd5540b9b25c5e97fe
commit: efadc5874cdecc0420926afd5540b9b25c5e97fe
branch: main
author: Sam Gross <[email protected]>
committer: colesbury <[email protected]>
date: 2025-03-04T11:19:06-05:00
summary:
Revert "gh-128041: Add `terminate_workers` and `kill_workers` methods to
ProcessPoolExecutor (GH-128043)" (#130838)
The test_concurrent_futures.test_process_pool test is failing in CI.
This reverts commit f97e4098ff71a6488fd3411f9f9e6fa7a7bb4efe.
files:
D Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst
M Doc/library/concurrent.futures.rst
M Doc/whatsnew/3.14.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures/test_process_pool.py
diff --git a/Doc/library/concurrent.futures.rst
b/Doc/library/concurrent.futures.rst
index dc613f2f8f00cd..5a950081a1c98d 100644
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -415,30 +415,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
require the *fork* start method for :class:`ProcessPoolExecutor` you must
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
- .. method:: terminate_workers()
-
- Attempt to terminate all living worker processes immediately by calling
- :meth:`Process.terminate <multiprocessing.Process.terminate>` on each of
them.
- Internally, it will also call :meth:`Executor.shutdown` to ensure that
all
- other resources associated with the executor are freed.
-
- After calling this method the caller should no longer submit tasks to the
- executor.
-
- .. versionadded:: next
-
- .. method:: kill_workers()
-
- Attempt to kill all living worker processes immediately by calling
- :meth:`Process.kill <multiprocessing.Process.kill>` on each of them.
- Internally, it will also call :meth:`Executor.shutdown` to ensure that
all
- other resources associated with the executor are freed.
-
- After calling this method the caller should no longer submit tasks to the
- executor.
-
- .. versionadded:: next
-
.. _processpoolexecutor-example:
ProcessPoolExecutor Example
diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst
index 0721ea5cb8d5d1..aa802faae50b12 100644
--- a/Doc/whatsnew/3.14.rst
+++ b/Doc/whatsnew/3.14.rst
@@ -444,11 +444,6 @@ contextvars
* Support context manager protocol by :class:`contextvars.Token`.
(Contributed by Andrew Svetlov in :gh:`129889`.)
-* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
- :meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
- ways to terminate or kill all living worker processes in the given pool.
- (Contributed by Charles Machalow in :gh:`128043`.)
-
ctypes
------
diff --git a/Lib/concurrent/futures/process.py
b/Lib/concurrent/futures/process.py
index d79d6b959c90d3..42eee72bc1457f 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -626,14 +626,6 @@ class BrokenProcessPool(_base.BrokenExecutor):
while a future was in the running state.
"""
-_TERMINATE = "terminate"
-_KILL = "kill"
-
-_SHUTDOWN_CALLBACK_OPERATION = {
- _TERMINATE,
- _KILL
-}
-
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
@@ -863,66 +855,3 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._executor_manager_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
-
- def _force_shutdown(self, operation):
- """Attempts to terminate or kill the executor's workers based off the
- given operation. Iterates through all of the current processes and
- performs the relevant task if the process is still alive.
-
- After terminating workers, the pool will be in a broken state
- and no longer usable (for instance, new tasks should not be
- submitted).
- """
- if operation not in _SHUTDOWN_CALLBACK_OPERATION:
- raise ValueError(f"Unsupported operation: {operation!r}")
-
- processes = {}
- if self._processes:
- processes = self._processes.copy()
-
- # shutdown will invalidate ._processes, so we copy it right before
- # calling. If we waited here, we would deadlock if a process decides
not
- # to exit.
- self.shutdown(wait=False, cancel_futures=True)
-
- if not processes:
- return
-
- for proc in processes.values():
- try:
- if not proc.is_alive():
- continue
- except ValueError:
- # The process is already exited/closed out.
- continue
-
- try:
- if operation == _TERMINATE:
- proc.terminate()
- elif operation == _KILL:
- proc.kill()
- except ProcessLookupError:
- # The process just ended before our signal
- continue
-
- def terminate_workers(self):
- """Attempts to terminate the executor's workers.
- Iterates through all of the current worker processes and terminates
- each one that is still alive.
-
- After terminating workers, the pool will be in a broken state
- and no longer usable (for instance, new tasks should not be
- submitted).
- """
- return self._force_shutdown(operation=_TERMINATE)
-
- def kill_workers(self):
- """Attempts to kill the executor's workers.
- Iterates through all of the current worker processes and kills
- each one that is still alive.
-
- After killing workers, the pool will be in a broken state
- and no longer usable (for instance, new tasks should not be
- submitted).
- """
- return self._force_shutdown(operation=_KILL)
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py
b/Lib/test/test_concurrent_futures/test_process_pool.py
index 354b7d0a346970..8b1bdaa33d8f5c 100644
--- a/Lib/test/test_concurrent_futures/test_process_pool.py
+++ b/Lib/test/test_concurrent_futures/test_process_pool.py
@@ -1,17 +1,13 @@
import os
-import queue
-import signal
import sys
import threading
import time
import unittest
-import unittest.mock
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool
from test import support
from test.support import hashlib_helper
-from test.test_importlib.metadata.fixtures import parameterize
from .executor import ExecutorTest, mul
from .util import (
@@ -26,19 +22,6 @@ def __init__(self, mgr):
def __del__(self):
self.event.set()
-TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__
-KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__
-FORCE_SHUTDOWN_PARAMS = [
- dict(function_name=TERMINATE_WORKERS),
- dict(function_name=KILL_WORKERS),
-]
-
-def _put_sleep_put(queue):
- """ Used as part of test_terminate_workers """
- queue.put('started')
- time.sleep(2)
- queue.put('finished')
-
class ProcessPoolExecutorTest(ExecutorTest):
@@ -235,86 +218,6 @@ def mock_start_new_thread(func, *args, **kwargs):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()
- def test_terminate_workers(self):
- mock_fn = unittest.mock.Mock()
- with self.executor_type(max_workers=1) as executor:
- executor._force_shutdown = mock_fn
- executor.terminate_workers()
-
- mock_fn.assert_called_once_with(operation=futures.process._TERMINATE)
-
- def test_kill_workers(self):
- mock_fn = unittest.mock.Mock()
- with self.executor_type(max_workers=1) as executor:
- executor._force_shutdown = mock_fn
- executor.kill_workers()
-
- mock_fn.assert_called_once_with(operation=futures.process._KILL)
-
- def test_force_shutdown_workers_invalid_op(self):
- with self.executor_type(max_workers=1) as executor:
- self.assertRaises(ValueError,
- executor._force_shutdown,
- operation='invalid operation'),
-
- @parameterize(*FORCE_SHUTDOWN_PARAMS)
- def test_force_shutdown_workers(self, function_name):
- manager = self.get_context().Manager()
- q = manager.Queue()
-
- with self.executor_type(max_workers=1) as executor:
- executor.submit(_put_sleep_put, q)
-
- # We should get started, but not finished since we'll terminate the
- # workers just after
- self.assertEqual(q.get(timeout=5), 'started')
-
- worker_process = list(executor._processes.values())[0]
- getattr(executor, function_name)()
- worker_process.join()
-
- if function_name == TERMINATE_WORKERS or \
- sys.platform == 'win32':
- # On windows, kill and terminate both send SIGTERM
- self.assertEqual(worker_process.exitcode, -signal.SIGTERM)
- elif function_name == KILL_WORKERS:
- self.assertEqual(worker_process.exitcode, -signal.SIGKILL)
- else:
- self.fail(f"Unknown operation: {function_name}")
-
- self.assertRaises(queue.Empty, q.get, timeout=1)
-
- @parameterize(*FORCE_SHUTDOWN_PARAMS)
- def test_force_shutdown_workers_dead_workers(self, function_name):
- with self.executor_type(max_workers=1) as executor:
- future = executor.submit(os._exit, 1)
- self.assertRaises(BrokenProcessPool, future.result)
-
- # even though the pool is broken, this shouldn't raise
- getattr(executor, function_name)()
-
- @parameterize(*FORCE_SHUTDOWN_PARAMS)
- def test_force_shutdown_workers_not_started_yet(self, function_name):
- ctx = self.get_context()
- with unittest.mock.patch.object(ctx, 'Process') as mock_process:
- with self.executor_type(max_workers=1, mp_context=ctx) as executor:
- # The worker has not been started yet, terminate/kill_workers
- # should basically no-op
- getattr(executor, function_name)()
-
- mock_process.return_value.kill.assert_not_called()
- mock_process.return_value.terminate.assert_not_called()
-
- @parameterize(*FORCE_SHUTDOWN_PARAMS)
- def test_force_shutdown_workers_stops_pool(self, function_name):
- with self.executor_type(max_workers=1) as executor:
- task = executor.submit(time.sleep, 0)
- self.assertIsNone(task.result())
-
- getattr(executor, function_name)()
-
- self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
-
create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
diff --git
a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst
b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst
deleted file mode 100644
index bb9ef96d45eb79..00000000000000
--- a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst
+++ /dev/null
@@ -1,4 +0,0 @@
-Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
-:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
-ways to terminate or kill all living worker processes in the given pool.
-(Contributed by Charles Machalow in :gh:`128043`.)
_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: [email protected]