Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-Pebble for openSUSE:Factory checked in at 2026-01-27 16:17:51 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-Pebble (Old) and /work/SRC/openSUSE:Factory/.python-Pebble.new.1928 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-Pebble" Tue Jan 27 16:17:51 2026 rev:20 rq:1329421 version:5.2.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-Pebble/python-Pebble.changes 2025-08-15 21:53:59.664945075 +0200 +++ /work/SRC/openSUSE:Factory/.python-Pebble.new.1928/python-Pebble.changes 2026-01-27 16:17:59.924205157 +0100 @@ -1,0 +2,8 @@ +Tue Jan 27 07:34:58 UTC 2026 - Dirk Müller <[email protected]> + +- update to 5.2.0: + * issue #158: set pending futures to BrokenProcessPool error + when ProcessPool internal errors occur. + * Cleanup resources when terminating pool's processes via SIGTERM. + +------------------------------------------------------------------- Old: ---- pebble-5.1.3.tar.gz New: ---- pebble-5.2.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-Pebble.spec ++++++ --- /var/tmp/diff_new_pack.PYTixF/_old 2026-01-27 16:18:00.608233649 +0100 +++ /var/tmp/diff_new_pack.PYTixF/_new 2026-01-27 16:18:00.608233649 +0100 @@ -1,7 +1,7 @@ # # spec file for package python-Pebble # -# Copyright (c) 2025 SUSE LLC and contributors +# Copyright (c) 2026 SUSE LLC and contributors # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -18,7 +18,7 @@ %{?sle15_python_module_pythons} Name: python-Pebble -Version: 5.1.3 +Version: 5.2.0 Release: 0 Summary: Threading and multiprocessing eye-candy for Python License: LGPL-3.0-only ++++++ pebble-5.1.3.tar.gz -> pebble-5.2.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/PKG-INFO new/pebble-5.2.0/PKG-INFO --- old/pebble-5.1.3/PKG-INFO 2025-07-30 23:14:44.652518000 +0200 +++ new/pebble-5.2.0/PKG-INFO 2026-01-25 12:11:59.325057000 +0100 @@ -1,6 +1,6 @@ -Metadata-Version: 2.2 +Metadata-Version: 2.4 Name: Pebble -Version: 5.1.3 +Version: 5.2.0 Summary: Threading and multiprocessing eye-candy. Home-page: https://github.com/noxdafox/pebble Author: Matteo Cafasso @@ -22,6 +22,7 @@ Dynamic: home-page Dynamic: keywords Dynamic: license +Dynamic: license-file Dynamic: requires-python Dynamic: summary diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/Pebble.egg-info/PKG-INFO new/pebble-5.2.0/Pebble.egg-info/PKG-INFO --- old/pebble-5.1.3/Pebble.egg-info/PKG-INFO 2025-07-30 23:14:44.000000000 +0200 +++ new/pebble-5.2.0/Pebble.egg-info/PKG-INFO 2026-01-25 12:11:59.000000000 +0100 @@ -1,6 +1,6 @@ -Metadata-Version: 2.2 +Metadata-Version: 2.4 Name: Pebble -Version: 5.1.3 +Version: 5.2.0 Summary: Threading and multiprocessing eye-candy. Home-page: https://github.com/noxdafox/pebble Author: Matteo Cafasso @@ -22,6 +22,7 @@ Dynamic: home-page Dynamic: keywords Dynamic: license +Dynamic: license-file Dynamic: requires-python Dynamic: summary diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/__init__.py new/pebble-5.2.0/pebble/__init__.py --- old/pebble-5.1.3/pebble/__init__.py 2025-07-30 23:13:34.000000000 +0200 +++ new/pebble-5.2.0/pebble/__init__.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,9 +1,11 @@ __author__ = 'Matteo Cafasso' -__version__ = '5.1.3' +__version__ = '5.2.0' __license__ = 'LGPL' -__all__ = ['waitforthreads', +__all__ = ['concurrent', + 'asynchronous', + 'waitforthreads', 'waitforqueues', 'synchronized', 'sighandler', @@ -12,7 +14,8 @@ 'ProcessMapFuture', 'ProcessExpired', 'ProcessPool', - 'ThreadPool'] + 'ThreadPool', + 'CONSTS'] from pebble import concurrent, asynchronous diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/asynchronous/process.py new/pebble-5.2.0/pebble/asynchronous/process.py --- old/pebble-5.1.3/pebble/asynchronous/process.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/asynchronous/process.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License @@ -97,13 +97,11 @@ else: future = loop.create_future() reader, writer = mp_context.Pipe(duplex=False) - worker = common.launch_process( name, common.function_handler, daemon, mp_context, - target, args, kwargs, (reader, writer)) + target, args, kwargs, writer) writer.close() - loop.create_task(_worker_handler(future, worker, reader, timeout)) return future diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/asynchronous/thread.py new/pebble-5.2.0/pebble/asynchronous/thread.py --- old/pebble-5.1.3/pebble/asynchronous/thread.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/asynchronous/thread.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/common/__init__.py new/pebble-5.2.0/pebble/common/__init__.py --- old/pebble-5.1.3/pebble/common/__init__.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/common/__init__.py 2026-01-25 12:11:53.000000000 +0100 @@ -9,6 +9,6 @@ from pebble.common.types import ThreadDecoratorParamsReturnType from pebble.common.types import ProcessDecoratorReturnType from pebble.common.types import ProcessDecoratorParamsReturnType -from pebble.common.process import launch_process, stop_process +from pebble.common.process import launch_process, stop_process, process_exit from pebble.common.process import register_function, maybe_install_trampoline from pebble.common.process import process_execute, send_result, function_handler diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/common/process.py new/pebble-5.2.0/pebble/common/process.py --- old/pebble-5.1.3/pebble/common/process.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/common/process.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License @@ -31,7 +31,8 @@ def launch_process( name: str, function: Callable, - daemon: bool, mp_context: multiprocessing.context, + daemon: bool, + mp_context: multiprocessing.context, *args, **kwargs ) -> multiprocessing.Process: @@ -79,20 +80,23 @@ function: Callable, args: list, kwargs: dict, - pipe: multiprocessing.Pipe + writer: multiprocessing.Pipe ): """Runs the actual function in separate process and returns its result.""" signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_DFL) - - reader, writer = pipe - reader.close() + signal.signal(signal.SIGTERM, process_exit) result = process_execute(function, *args, **kwargs) send_result(writer, result) +def process_exit(exitcode, *_): + """Ensure mltiprocessing cleanup is performed to avoid resources leak.""" + multiprocessing.util._exit_function() + os._exit(exitcode) + + ################################################################################ # Spawn process start method handling logic. # # # diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/common/shared.py new/pebble-5.2.0/pebble/common/shared.py --- old/pebble-5.1.3/pebble/common/shared.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/common/shared.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/common/types.py new/pebble-5.2.0/pebble/common/types.py --- old/pebble-5.1.3/pebble/common/types.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/common/types.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/concurrent/process.py new/pebble-5.2.0/pebble/concurrent/process.py --- old/pebble-5.1.3/pebble/concurrent/process.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/concurrent/process.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License @@ -96,13 +96,11 @@ else: future = common.ProcessFuture() reader, writer = mp_context.Pipe(duplex=False) - worker = common.launch_process( name, common.function_handler, daemon, mp_context, - target, args, kwargs, (reader, writer)) + target, args, kwargs, writer) writer.close() - future.set_running_or_notify_cancel() common.launch_thread( diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/concurrent/thread.py new/pebble-5.2.0/pebble/concurrent/thread.py --- old/pebble-5.1.3/pebble/concurrent/thread.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/concurrent/thread.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/decorators.py new/pebble-5.2.0/pebble/decorators.py --- old/pebble-5.1.3/pebble/decorators.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/decorators.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/functions.py new/pebble-5.2.0/pebble/functions.py --- old/pebble-5.1.3/pebble/functions.py 2025-07-23 22:16:02.000000000 +0200 +++ new/pebble-5.2.0/pebble/functions.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/pool/base_pool.py new/pebble-5.2.0/pebble/pool/base_pool.py --- old/pebble-5.1.3/pebble/pool/base_pool.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/pool/base_pool.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/pool/channel.py new/pebble-5.2.0/pebble/pool/channel.py --- old/pebble-5.1.3/pebble/pool/channel.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/pool/channel.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/pool/process.py new/pebble-5.2.0/pebble/pool/process.py --- old/pebble-5.1.3/pebble/pool/process.py 2025-07-30 23:09:56.000000000 +0200 +++ new/pebble-5.2.0/pebble/pool/process.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License @@ -27,14 +27,14 @@ from concurrent.futures.process import BrokenProcessPool from concurrent.futures import CancelledError, TimeoutError +from pebble.pool.channel import WorkerChannel, channels from pebble.pool.base_pool import Worker, iter_chunks, run_initializer from pebble.pool.base_pool import PoolContext, BasePool, Task, TaskPayload from pebble.pool.base_pool import PoolStatus, ProcessMapFuture, map_results -from pebble.pool.channel import ChannelError, WorkerChannel, channels from pebble.common import Result, ResultStatus, CONSTS -from pebble.common import launch_process, stop_process from pebble.common import ProcessExpired, ProcessFuture from pebble.common import process_execute, launch_thread +from pebble.common import launch_process, stop_process, process_exit class ProcessPool(BasePool): @@ -155,12 +155,11 @@ def task_scheduler_loop(pool_manager: 'PoolManager'): - context = pool_manager.context - task_queue = context.task_queue + task_queue = pool_manager.context.task_queue try: - while context.alive and not GLOBAL_SHUTDOWN: - task = task_queue.get() + while pool_manager.context.alive and not GLOBAL_SHUTDOWN: + task = pool_manager.context.task_queue.get() if task is not None: if task.future.cancelled(): @@ -170,29 +169,25 @@ pool_manager.schedule(task) else: task_queue.task_done() # Termination sentinel received - except BrokenProcessPool: - context.status = PoolStatus.ERROR + except BrokenProcessPool as error: + pool_manager.handle_broken_pool(error) def pool_manager_loop(pool_manager: 'PoolManager'): - context = pool_manager.context - try: - while context.alive and not GLOBAL_SHUTDOWN: + while pool_manager.context.alive and not GLOBAL_SHUTDOWN: pool_manager.update_status() time.sleep(CONSTS.sleep_unit) - except BrokenProcessPool: - context.status = PoolStatus.ERROR + except BrokenProcessPool as error: + pool_manager.handle_broken_pool(error) def message_manager_loop(pool_manager: 'PoolManager'): - context = pool_manager.context - try: - while context.alive and not GLOBAL_SHUTDOWN: + while pool_manager.context.alive and not GLOBAL_SHUTDOWN: pool_manager.process_next_message(CONSTS.sleep_unit) - except BrokenProcessPool: - context.status = PoolStatus.ERROR + except BrokenProcessPool as error: + pool_manager.handle_broken_pool(error) class PoolManager: @@ -206,6 +201,7 @@ mp_context) def start(self): + self.worker_manager.create_channels() self.worker_manager.create_workers() def stop(self): @@ -276,6 +272,10 @@ raise BrokenProcessPool("All workers expired") + def handle_broken_pool(self, error: BrokenProcessPool): + self.context.status = PoolStatus.ERROR + self.task_manager.tasks_abort(error) + class TaskManager: """Manages the tasks flow within the Pool. @@ -292,10 +292,14 @@ self.tasks[task.id] = task def task_start(self, task_id: int, worker_id: Optional[int]): - task = self.tasks[task_id] - task.worker_id = worker_id - task.timestamp = time.time() - task.set_running_or_notify_cancel() + try: + task = self.tasks[task_id] + except KeyError: + return # task already completed + else: + task.worker_id = worker_id + task.timestamp = time.time() + task.set_running_or_notify_cancel() def task_done(self, task_id: int, result: Result): """Set the tasks result and run the callback.""" @@ -318,6 +322,11 @@ self.task_start(task_id, None) self.task_done(task_id, Result(ResultStatus.ERROR, error)) + def tasks_abort(self, error: Exception): + """Abort all tasks due to critical error.""" + for task_id in dictionary_keys(self.tasks): + self.task_problem(task_id, error) + def timeout_tasks(self) -> tuple: return tuple(t for t in dictionary_values(self.tasks) if self.timeout(t)) @@ -346,8 +355,9 @@ self.workers = {} self.workers_number = workers self.worker_parameters = worker_parameters - self.pool_channel, self.workers_channel = channels(mp_context) self.mp_context = mp_context + self.pool_channel = None + self.workers_channel = None def dispatch(self, task: Task): try: @@ -363,7 +373,9 @@ return self.pool_channel.recv() else: return NoMessage() - except (OSError, TypeError) as error: + except PICKLING_ERRORS as error: + raise BrokenProcessPool("Unpicklable object from worker") from error + except OSError as error: raise BrokenProcessPool from error except EOFError: # Pool shutdown return NoMessage() @@ -382,14 +394,19 @@ return tuple((w.pid, w.exitcode) for w in expired if w.exitcode != 0) + def create_channels(self): + self.pool_channel, self.workers_channel = channels(self.mp_context) + + def close_channels(self): + if self.pool_channel is not None: + self.pool_channel.close() + if self.workers_channel is not None: + self.workers_channel.close() + def create_workers(self): for _ in range(self.workers_number - len(self.workers)): self.new_worker() - def close_channels(self): - self.pool_channel.close() - self.workers_channel.close() - def force_stop_workers(self): for worker_id in tuple(self.workers.keys()): stop_process(self.workers.pop(worker_id)) @@ -421,18 +438,18 @@ def worker_process(params: Worker, channel: WorkerChannel): """The worker process routines.""" signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGTERM, process_exit) channel.initialize() if params.initializer is not None: if not run_initializer(params.initializer, params.initargs): - os._exit(1) + process_exit(1) try: process_tasks(params, channel) except (OSError, RuntimeError) as error: - os._exit(getattr(error, 'errno', None) or 1) + process_exit(getattr(error, 'errno', None) or 1) except EOFError: # pipe closed on main loop pass @@ -506,6 +523,15 @@ stop_process(worker) +def dictionary_keys(dictionary: dict) -> tuple: + """Returns a snapshot of the dictionary keys handling race conditions.""" + while True: + try: + return tuple(dictionary.keys()) + except RuntimeError: # race condition + pass + + def dictionary_values(dictionary: dict) -> tuple: """Returns a snapshot of the dictionary values handling race conditions.""" while True: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/pebble/pool/thread.py new/pebble-5.2.0/pebble/pool/thread.py --- old/pebble-5.1.3/pebble/pool/thread.py 2025-02-26 22:51:07.000000000 +0100 +++ new/pebble-5.2.0/pebble/pool/thread.py 2026-01-25 12:11:53.000000000 +0100 @@ -1,5 +1,5 @@ # This file is part of Pebble. -# Copyright (c) 2013-2025, Matteo Cafasso +# Copyright (c) 2013-2026, Matteo Cafasso # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/test/test_process_pool_fork.py new/pebble-5.2.0/test/test_process_pool_fork.py --- old/pebble-5.1.3/test/test_process_pool_fork.py 2025-07-30 23:10:21.000000000 +0200 +++ new/pebble-5.2.0/test/test_process_pool_fork.py 2026-01-25 12:11:53.000000000 +0100 @@ -10,6 +10,7 @@ import dataclasses import multiprocessing +from concurrent.futures.process import BrokenProcessPool from concurrent.futures import CancelledError, TimeoutError import pebble @@ -66,6 +67,19 @@ return BaseException("BOOM!") +def unpickleable_error_function(): + raise UnpickleableException() + + +class UnpickleableException(Exception): + """This exception cannot be correctly re-constructed + on the main side of the pool. + """ + def __init__(self): + self.message = "BOOM!" + super().__init__(self.message) + + @dataclasses.dataclass(frozen=True) class FrozenError(Exception): pass @@ -107,6 +121,7 @@ def pool_function(): + signal.signal(signal.SIGTERM, lambda _n, _s: sys.exit(0)) pool = multiprocessing.Pool(1) result = pool.apply(function, args=[1]) pool.close() @@ -116,7 +131,8 @@ def pebble_function(): - with ProcessPool(max_workers=1) as pool: + signal.signal(signal.SIGTERM, lambda _n, _s: sys.exit(0)) + with ProcessPool(max_workers=1, context=mp_context) as pool: f = pool.schedule(function, args=[1]) return f.result() @@ -198,6 +214,13 @@ future = pool.schedule(pickle_error_function) self.assertRaises((pickle.PicklingError, TypeError), future.result) + def test_process_pool_broken_pickling_error(self): + """Process Pool Fork pickling errors breaking the pool + are raised by future.result.""" + with ProcessPool(max_workers=1, context=mp_context) as pool: + future = pool.schedule(unpickleable_error_function) + self.assertRaises(BrokenProcessPool, future.result) + def test_process_pool_frozen_error(self): """Process Pool Fork frozen errors are raised by future get.""" with ProcessPool(max_workers=1, context=mp_context) as pool: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/test/test_process_pool_forkserver.py new/pebble-5.2.0/test/test_process_pool_forkserver.py --- old/pebble-5.1.3/test/test_process_pool_forkserver.py 2025-07-30 23:11:23.000000000 +0200 +++ new/pebble-5.2.0/test/test_process_pool_forkserver.py 2026-01-25 12:11:53.000000000 +0100 @@ -10,6 +10,7 @@ import dataclasses import multiprocessing +from concurrent.futures.process import BrokenProcessPool from concurrent.futures import CancelledError, TimeoutError import pebble @@ -68,8 +69,17 @@ return BaseException("BOOM!") -def pickle_error_function(): - return threading.Lock() +def unpickleable_error_function(): + raise UnpickleableException() + + +class UnpickleableException(Exception): + """This exception cannot be correctly re-constructed + on the main side of the pool. + """ + def __init__(self): + self.message = "BOOM!" + super().__init__(self.message) @dataclasses.dataclass(frozen=True) @@ -81,6 +91,10 @@ raise FrozenError() +def pickle_error_function(): + return threading.Lock() + + def long_function(value=1): time.sleep(value) return value @@ -109,6 +123,7 @@ def pool_function(): + signal.signal(signal.SIGTERM, lambda _n, _s: sys.exit(0)) pool = multiprocessing.Pool(1) result = pool.apply(function, args=[1]) pool.close() @@ -118,7 +133,8 @@ def pebble_function(): - with ProcessPool(max_workers=1) as pool: + signal.signal(signal.SIGTERM, lambda _n, _s: sys.exit(0)) + with ProcessPool(max_workers=1, context=mp_context) as pool: f = pool.schedule(function, args=[1]) return f.result() @@ -200,6 +216,13 @@ future = pool.schedule(pickle_error_function) self.assertRaises((pickle.PicklingError, TypeError), future.result) + def test_process_pool_broken_pickling_error(self): + """Process Pool Forkserver pickling errors breaking the pool + are raised by future.result.""" + with ProcessPool(max_workers=1, context=mp_context) as pool: + future = pool.schedule(unpickleable_error_function) + self.assertRaises(BrokenProcessPool, future.result) + def test_process_pool_frozen_error(self): """Process Pool Forkserver frozen errors are raised by future get.""" with ProcessPool(max_workers=1, context=mp_context) as pool: @@ -848,7 +871,7 @@ """Process Pool Forkserver is stopped when futures are cancelled on large data.""" data = b'A' * 1024 * 1024 * 100 - with pebble.ProcessPool() as pool: + with pebble.ProcessPool(context=mp_context) as pool: futures = [pool.schedule(function, args=[data]) for _ in range(10)] concurrent.futures.wait( futures, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/pebble-5.1.3/test/test_process_pool_spawn.py new/pebble-5.2.0/test/test_process_pool_spawn.py --- old/pebble-5.1.3/test/test_process_pool_spawn.py 2025-07-30 23:11:00.000000000 +0200 +++ new/pebble-5.2.0/test/test_process_pool_spawn.py 2026-01-25 12:11:53.000000000 +0100 @@ -10,6 +10,7 @@ import dataclasses import multiprocessing +from concurrent.futures.process import BrokenProcessPool from concurrent.futures import CancelledError, TimeoutError import pebble @@ -66,8 +67,17 @@ return BaseException("BOOM!") -def pickle_error_function(): - return threading.Lock() +def unpickleable_error_function(): + raise UnpickleableException() + + +class UnpickleableException(Exception): + """This exception cannot be correctly re-constructed + on the main side of the pool. + """ + def __init__(self): + self.message = "BOOM!" + super().__init__(self.message) @dataclasses.dataclass(frozen=True) @@ -79,6 +89,10 @@ raise FrozenError() +def pickle_error_function(): + return threading.Lock() + + def long_function(value=1): time.sleep(value) return value @@ -107,6 +121,7 @@ def pool_function(): + signal.signal(signal.SIGTERM, lambda _n, _s: sys.exit(0)) pool = multiprocessing.Pool(1) result = pool.apply(function, args=[1]) pool.close() @@ -116,7 +131,8 @@ def pebble_function(): - with ProcessPool(max_workers=1) as pool: + signal.signal(signal.SIGTERM, lambda _n, _s: sys.exit(0)) + with ProcessPool(max_workers=1, context=mp_context) as pool: f = pool.schedule(function, args=[1]) return f.result() @@ -198,6 +214,13 @@ future = pool.schedule(pickle_error_function) self.assertRaises((pickle.PicklingError, TypeError), future.result) + def test_process_pool_broken_pickling_error(self): + """Process Pool Spawn pickling errors breaking the pool + are raised by future.result.""" + with ProcessPool(max_workers=1, context=mp_context) as pool: + future = pool.schedule(unpickleable_error_function) + self.assertRaises(BrokenProcessPool, future.result) + def test_process_pool_frozen_error(self): """Process Pool Spawn frozen errors are raised by future get.""" with ProcessPool(max_workers=1, context=mp_context) as pool: @@ -846,7 +869,7 @@ """Process Pool Spawn is stopped when futures are cancelled on large data.""" data = b'A' * 1024 * 1024 * 100 - with pebble.ProcessPool() as pool: + with pebble.ProcessPool(context=mp_context) as pool: futures = [pool.schedule(function, args=[data]) for _ in range(10)] concurrent.futures.wait( futures,
