Script 'mail_helper' called by obssrc Hello community, here is the log from the commit of package python-Pebble for openSUSE:Factory checked in at 2022-11-16 15:43:56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-Pebble (Old) and /work/SRC/openSUSE:Factory/.python-Pebble.new.1597 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-Pebble" Wed Nov 16 15:43:56 2022 rev:13 rq:1035961 version:5.0.3 Changes: -------- --- /work/SRC/openSUSE:Factory/python-Pebble/python-Pebble.changes 2022-10-12 18:25:34.341796512 +0200 +++ /work/SRC/openSUSE:Factory/.python-Pebble.new.1597/python-Pebble.changes 2022-11-16 15:43:57.343969617 +0100 @@ -1,0 +2,7 @@ +Tue Nov 15 21:50:22 UTC 2022 - Martin Liška <mli...@suse.cz> + +- Update to 5.0.3 + * issue #111: reinstate `Pool.schedule` function in place of + `Pool.submit`. + +------------------------------------------------------------------- Old: ---- Pebble-5.0.2.tar.gz New: ---- Pebble-5.0.3.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-Pebble.spec ++++++ --- /var/tmp/diff_new_pack.oP0s6w/_old 2022-11-16 15:43:57.771971719 +0100 +++ /var/tmp/diff_new_pack.oP0s6w/_new 2022-11-16 15:43:57.775971739 +0100 @@ -17,7 +17,7 @@ Name: python-Pebble -Version: 5.0.2 +Version: 5.0.3 Release: 0 Summary: Threading and multiprocessing eye-candy for Python License: LGPL-3.0-only ++++++ Pebble-5.0.2.tar.gz -> Pebble-5.0.3.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/PKG-INFO new/Pebble-5.0.3/PKG-INFO --- old/Pebble-5.0.2/PKG-INFO 2022-10-09 17:41:32.561891000 +0200 +++ new/Pebble-5.0.3/PKG-INFO 2022-11-15 22:41:11.028350800 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: Pebble -Version: 5.0.2 +Version: 5.0.3 Summary: Threading and multiprocessing eye-candy. Home-page: https://github.com/noxdafox/pebble Author: Matteo Cafasso @@ -89,7 +89,7 @@ from pebble import ProcessPool from concurrent.futures import TimeoutError - + TIMEOUT_SECONDS = 3 def function(foo, bar=0): @@ -106,7 +106,7 @@ with ProcessPool(max_workers=5, max_tasks=10) as pool: for index in range(0, 10): - future = pool.submit(function, TIMEOUT_SECONDS, index, bar=1) + future = pool.schedule(function, index, bar=1, timeout=TIMEOUT_SECONDS) future.add_done_callback(task_done) Keywords: thread process pool decorator diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/Pebble.egg-info/PKG-INFO new/Pebble-5.0.3/Pebble.egg-info/PKG-INFO --- old/Pebble-5.0.2/Pebble.egg-info/PKG-INFO 2022-10-09 17:41:32.000000000 +0200 +++ new/Pebble-5.0.3/Pebble.egg-info/PKG-INFO 2022-11-15 22:41:10.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: Pebble -Version: 5.0.2 +Version: 5.0.3 Summary: Threading and multiprocessing eye-candy. Home-page: https://github.com/noxdafox/pebble Author: Matteo Cafasso @@ -89,7 +89,7 @@ from pebble import ProcessPool from concurrent.futures import TimeoutError - + TIMEOUT_SECONDS = 3 def function(foo, bar=0): @@ -106,7 +106,7 @@ with ProcessPool(max_workers=5, max_tasks=10) as pool: for index in range(0, 10): - future = pool.submit(function, TIMEOUT_SECONDS, index, bar=1) + future = pool.schedule(function, index, bar=1, timeout=TIMEOUT_SECONDS) future.add_done_callback(task_done) Keywords: thread process pool decorator diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/README.rst new/Pebble-5.0.3/README.rst --- old/Pebble-5.0.2/README.rst 2022-10-05 22:53:41.000000000 +0200 +++ new/Pebble-5.0.3/README.rst 2022-11-15 21:56:07.000000000 +0100 @@ -81,7 +81,7 @@ from pebble import ProcessPool from concurrent.futures import TimeoutError - + TIMEOUT_SECONDS = 3 def function(foo, bar=0): @@ -98,5 +98,5 @@ with ProcessPool(max_workers=5, max_tasks=10) as pool: for index in range(0, 10): - future = pool.submit(function, TIMEOUT_SECONDS, index, bar=1) + future = pool.schedule(function, index, bar=1, timeout=TIMEOUT_SECONDS) future.add_done_callback(task_done) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/pebble/__init__.py new/Pebble-5.0.3/pebble/__init__.py --- old/Pebble-5.0.2/pebble/__init__.py 2022-10-09 17:40:46.000000000 +0200 +++ new/Pebble-5.0.3/pebble/__init__.py 2022-11-15 22:20:58.000000000 +0100 @@ -1,5 +1,5 @@ __author__ = 'Matteo Cafasso' -__version__ = '5.0.2' +__version__ = '5.0.3' __license__ = 'LGPL' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/pebble/pool/process.py new/Pebble-5.0.3/pebble/pool/process.py --- old/Pebble-5.0.2/pebble/pool/process.py 2022-10-01 11:49:41.000000000 +0200 +++ new/Pebble-5.0.3/pebble/pool/process.py 2022-11-15 21:56:07.000000000 +0100 @@ -18,7 +18,6 @@ import time import atexit import pickle -import warnings import multiprocessing from itertools import count @@ -86,17 +85,18 @@ if self._message_manager_loop is not None: self._message_manager_loop.join() - def submit(self, function: Callable, - timeout: Optional[float], - *args, **kwargs) -> ProcessFuture: - """Submits *function* to the Pool for execution. - - *timeout* is an integer, if expires the task will be terminated - and *Future.result()* will raise *TimeoutError*. + def schedule(self, function: Callable, + args: list = (), + kwargs: dict = {}, + timeout: float = None) -> ProcessFuture: + """Schedules *function* to be run the Pool. *args* and *kwargs* will be forwareded to the scheduled function respectively as arguments and keyword arguments. + *timeout* is an integer, if expires the task will be terminated + and *Future.result()* will raise *TimeoutError*. + A *pebble.ProcessFuture* object is returned. """ @@ -110,25 +110,17 @@ return future - def schedule(self, function: Callable, - args: list = (), - kwargs: dict = {}, - timeout: float = None) -> ProcessFuture: - """Schedules *function* to be run the Pool. - - *args* and *kwargs* will be forwareded to the scheduled function - respectively as arguments and keyword arguments. - - *timeout* is an integer, if expires the task will be terminated - and *Future.result()* will raise *TimeoutError*. + def submit(self, function: Callable, + timeout: Optional[float], + *args, **kwargs) -> ProcessFuture: + """This function is provided for compatibility with + `asyncio.loop.run_in_executor`. - A *pebble.ProcessFuture* object is returned. + For scheduling jobs within the pool use `schedule` instead. """ - warnings.warn("schedule is deprecated; use submit instead", - DeprecationWarning) - - return self.submit(function, timeout, *args, **kwargs) + return self.schedule( + function, args=args, kwargs=kwargs, timeout=timeout) def map(self, function: Callable, *iterables, **kwargs) -> ProcessMapFuture: @@ -153,8 +145,9 @@ if chunksize < 1: raise ValueError("chunksize must be >= 1") - futures = [self.submit(process_chunk, timeout, function, chunk) - for chunk in iter_chunks(chunksize, *iterables)] + futures = [self.schedule( + process_chunk, args=(function, chunk), timeout=timeout) + for chunk in iter_chunks(chunksize, *iterables)] return map_results(ProcessMapFuture(futures), timeout) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/pebble/pool/thread.py new/Pebble-5.0.3/pebble/pool/thread.py --- old/Pebble-5.0.2/pebble/pool/thread.py 2022-09-11 20:08:19.000000000 +0200 +++ new/Pebble-5.0.3/pebble/pool/thread.py 2022-11-15 21:56:07.000000000 +0100 @@ -16,7 +16,6 @@ import time -import warnings import multiprocessing from itertools import count @@ -64,8 +63,8 @@ self._pool_manager_loop.join() self._pool_manager.stop() - def submit(self, function: Callable, *args, **kwargs) -> Future: - """Submits *function* to the Pool for execution. + def schedule(self, function, args=(), kwargs={}) -> Future: + """Schedules *function* to be run the Pool. *args* and *kwargs* will be forwareded to the scheduled function respectively as arguments and keyword arguments. @@ -83,19 +82,14 @@ return future - def schedule(self, function, args=(), kwargs={}) -> Future: - """Schedules *function* to be run the Pool. - - *args* and *kwargs* will be forwareded to the scheduled function - respectively as arguments and keyword arguments. + def submit(self, function: Callable, *args, **kwargs) -> Future: + """This function is provided for compatibility with + `asyncio.loop.run_in_executor`. - A *concurrent.futures.Future* object is returned. + For scheduling jobs within the pool use `schedule` instead. """ - warnings.warn("schedule is deprecated; use submit instead", - DeprecationWarning) - - return self.submit(function, *args, **kwargs) + return self.schedule(function, args=args, kwargs=kwargs) def map(self, function: Callable, *iterables, **kwargs) -> MapFuture: """Returns an iterator equivalent to map(function, iterables). @@ -113,7 +107,7 @@ if chunksize < 1: raise ValueError("chunksize must be >= 1") - futures = [self.submit(process_chunk, function, chunk) + futures = [self.schedule(process_chunk, args=(function, chunk)) for chunk in iter_chunks(chunksize, *iterables)] return map_results(MapFuture(futures), timeout) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/test/test_process_pool_fork.py new/Pebble-5.0.3/test/test_process_pool_fork.py --- old/Pebble-5.0.2/test/test_process_pool_fork.py 2022-09-12 19:47:41.000000000 +0200 +++ new/Pebble-5.0.3/test/test_process_pool_fork.py 2022-11-15 21:56:07.000000000 +0100 @@ -102,7 +102,7 @@ def pebble_function(): with ProcessPool(max_workers=1) as pool: - f = pool.submit(function, None, 1) + f = pool.schedule(function, args=[1]) return f.result() @@ -128,7 +128,8 @@ def test_process_pool_single_future(self): """Process Pool Fork single future.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) self.assertEqual(future.result(), 2) def test_process_pool_multiple_futures(self): @@ -136,13 +137,14 @@ futures = [] with ProcessPool(max_workers=2, context=mp_context) as pool: for _ in range(5): - futures.append(pool.submit(function, None, 1)) + futures.append(pool.schedule(function, args=[1])) self.assertEqual(sum([f.result() for f in futures]), 5) def test_process_pool_callback(self): """Process Pool Fork result is forwarded to the callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule( + function, args=[1], kwargs={'keyword_argument': 1}) future.add_done_callback(self.callback) self.event.wait() self.assertEqual(self.result, 2) @@ -150,13 +152,13 @@ def test_process_pool_error(self): """Process Pool Fork errors are raised by future get.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) self.assertRaises(Exception, future.result) def test_process_pool_error_callback(self): """Process Pool Fork errors are forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) future.add_done_callback(self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, Exception)) @@ -165,26 +167,26 @@ """Process Pool Fork task pickling errors are raised by future.result.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, threading.Lock()) + future = pool.schedule(function, args=[threading.Lock()]) self.assertRaises((pickle.PicklingError, TypeError), future.result) def test_process_pool_pickling_error_result(self): """Process Pool Fork result pickling errors are raised by future.result.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pickle_error_function, None) + future = pool.schedule(pickle_error_function) self.assertRaises((pickle.PicklingError, TypeError), future.result) def test_process_pool_timeout(self): """Process Pool Fork future raises TimeoutError if so.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, 0.1) + future = pool.schedule(long_function, timeout=0.1) self.assertRaises(TimeoutError, future.result) def test_process_pool_timeout_callback(self): """Process Pool Fork TimeoutError is forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, 0.1) + future = pool.schedule(long_function, timeout=0.1) future.add_done_callback(self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, TimeoutError)) @@ -192,7 +194,7 @@ def test_process_pool_cancel(self): """Process Pool Fork future raises CancelledError if so.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, None) + future = pool.schedule(long_function) time.sleep(0.1) # let the process pick up the task self.assertTrue(future.cancel()) self.assertRaises(CancelledError, future.result) @@ -200,7 +202,7 @@ def test_process_pool_cancel_callback(self): """Process Pool Fork CancelledError is forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, None) + future = pool.schedule(long_function) future.add_done_callback(self.callback) time.sleep(0.1) # let the process pick up the task self.assertTrue(future.cancel()) @@ -213,7 +215,7 @@ futures = [] with ProcessPool(max_workers=2, context=mp_context) as pool: for _ in range(0, 5): - futures.append(pool.submit(pid_function, None)) + futures.append(pool.schedule(pid_function)) self.assertEqual(len(set([f.result() for f in futures])), 2) def test_process_pool_future_limit(self): @@ -221,31 +223,31 @@ futures = [] with ProcessPool(max_workers=1, max_tasks=2, context=mp_context) as pool: for _ in range(0, 4): - futures.append(pool.submit(pid_function, None)) + futures.append(pool.schedule(pid_function)) self.assertEqual(len(set([f.result() for f in futures])), 2) def test_process_pool_stop_timeout(self): """Process Pool Fork workers are stopped if future timeout.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future1 = pool.submit(pid_function, None) - pool.submit(long_function, 0.1) - future2 = pool.submit(pid_function, None) + future1 = pool.schedule(pid_function) + pool.schedule(long_function, timeout=0.1) + future2 = pool.schedule(pid_function) self.assertNotEqual(future1.result(), future2.result()) def test_process_pool_stop_cancel(self): """Process Pool Fork workers are stopped if future is cancelled.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future1 = pool.submit(pid_function, None) - cancel_future = pool.submit(long_function, None) + future1 = pool.schedule(pid_function) + cancel_future = pool.schedule(long_function) time.sleep(0.1) # let the process pick up the task cancel_future.cancel() - future2 = pool.submit(pid_function, None) + future2 = pool.schedule(pid_function) self.assertNotEqual(future1.result(), future2.result()) def test_process_pool_initializer(self): """Process Pool Fork initializer is correctly run.""" with ProcessPool(initializer=initializer, initargs=[1], context=mp_context) as pool: - future = pool.submit(initializer_function, None) + future = pool.schedule(initializer_function) self.assertEqual(future.result(), 1) def test_process_pool_broken_initializer(self): @@ -254,18 +256,18 @@ with ProcessPool(initializer=broken_initializer, context=mp_context) as pool: pool.active time.sleep(0.4) - pool.submit(function, None) + pool.schedule(function) def test_process_pool_running(self): """Process Pool Fork is active if a future is scheduled.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertTrue(pool.active) def test_process_pool_stopped(self): """Process Pool Fork is not active once stopped.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertFalse(pool.active) def test_process_pool_close_futures(self): @@ -273,7 +275,7 @@ futures = [] pool = ProcessPool(max_workers=1, context=mp_context) for index in range(10): - futures.append(pool.submit(function, None, index)) + futures.append(pool.schedule(function, args=[index])) pool.close() pool.join() map(self.assertTrue, [f.done() for f in futures]) @@ -281,7 +283,7 @@ def test_process_pool_close_stopped(self): """Process Pool Fork is stopped after close.""" pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.close() pool.join() self.assertFalse(pool.active) @@ -291,7 +293,7 @@ futures = [] pool = ProcessPool(max_workers=1, context=mp_context) for index in range(10): - futures.append(pool.submit(function, None, index)) + futures.append(pool.schedule(function, args=[index])) pool.stop() pool.join() self.assertTrue(len([f for f in futures if not f.done()]) > 0) @@ -299,7 +301,7 @@ def test_process_pool_stop_stopped(self): """Process Pool Fork is stopped after stop.""" pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertFalse(pool.active) @@ -310,12 +312,12 @@ def stop_pool_callback(_): pool.stop() - future = pool.submit(function, None, 1) + future = pool.schedule(function, args=[1]) future.add_done_callback(stop_pool_callback) with self.assertRaises(RuntimeError): for index in range(10): time.sleep(0.1) - pool.submit(long_function, None, index) + pool.schedule(long_function, args=[index]) self.assertFalse(pool.active) @@ -324,7 +326,8 @@ data = "a" * 1098 * 1024 * 100 # 100 Mb with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, data, keyword_argument='') + future = pool.schedule( + function, args=[data], kwargs={'keyword_argument': ''}) self.assertEqual(data, future.result()) @@ -332,7 +335,7 @@ """Process Pool Fork is stopped if large data is sent on the channel.""" data = "a" * 1098 * 1024 * 100 # 100 Mb pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, data) + pool.schedule(function, args=[data]) time.sleep(1) pool.stop() pool.join() @@ -342,7 +345,7 @@ def test_process_pool_join_workers(self): """Process Pool Fork no worker is running after join.""" pool = ProcessPool(max_workers=4, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertEqual(len(pool._pool_manager.worker_manager.workers), 0) @@ -350,14 +353,14 @@ def test_process_pool_join_running(self): """Process Pool Fork RuntimeError is raised if active pool joined.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertRaises(RuntimeError, pool.join) def test_process_pool_join_futures_timeout(self): """Process Pool Fork TimeoutError is raised if join on long futures.""" pool = ProcessPool(max_workers=1, context=mp_context) for _ in range(2): - pool.submit(long_function, None) + pool.schedule(long_function) pool.close() self.assertRaises(TimeoutError, pool.join, 0.4) pool.stop() @@ -366,35 +369,38 @@ def test_process_pool_callback_error(self): """Process Pool Fork does not stop if error in callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1,keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) future.add_done_callback(self.callback) # sleep enough to ensure callback is run time.sleep(0.1) - pool.submit(function, None, 1, keyword_argument=1) + pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) def test_process_pool_exception_isolated(self): """Process Pool Fork an Exception does not affect other futures.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) try: future.result() except Exception: pass - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) self.assertEqual(future.result(), 2) @unittest.skipIf(os.name == 'nt', "Test won't run on Windows'.") def test_process_pool_ignoring_sigterm(self): """Process Pool Fork ignored SIGTERM signal are handled on Unix.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(sigterm_function, 0.2) + future = pool.schedule(sigterm_function, timeout=0.2) with self.assertRaises(TimeoutError): future.result() def test_process_pool_expired_worker(self): """Process Pool Fork unexpect death of worker raises ProcessExpired.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(suicide_function, None) + future = pool.schedule(suicide_function) self.assertRaises(ProcessExpired, future.result) def test_process_pool_map(self): @@ -531,19 +537,19 @@ def test_process_pool_child_process(self): """Process Pool Fork worker starts process.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(process_function, None) + future = pool.schedule(process_function) self.assertEqual(future.result(), 1) def test_process_pool_child_pool(self): """Process Pool Fork worker starts multiprocessing.Pool.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pool_function, None) + future = pool.schedule(pool_function) self.assertEqual(future.result(), 1) def test_process_pool_child_pebble(self): """Process Pool Fork worker starts pebble.ProcessPool.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pebble_function, None) + future = pool.schedule(pebble_function) self.assertEqual(future.result(), 1) @@ -765,7 +771,7 @@ with self.assertRaises(RuntimeError): pool = pebble.ProcessPool(max_workers=1, context=mp_context) for _ in range(10): - pool.submit(function, None) + pool.schedule(function) time.sleep(0.2) @@ -784,12 +790,12 @@ """Process Pool Fork no deadlock if writing worker dies locking channel.""" with pebble.ProcessPool(max_workers=1, context=mp_context) as pool: with self.assertRaises(pebble.ProcessExpired): - pool.submit(function, None).result() + pool.schedule(function).result() def test_pool_deadlock_stop(self): """Process Pool Fork writing deadlocks are stopping the Pool.""" with self.assertRaises(RuntimeError): pool = pebble.ProcessPool(max_workers=1, context=mp_context) for _ in range(10): - pool.submit(function, None) + pool.schedule(function) time.sleep(0.2) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/test/test_process_pool_forkserver.py new/Pebble-5.0.3/test/test_process_pool_forkserver.py --- old/Pebble-5.0.2/test/test_process_pool_forkserver.py 2022-09-12 20:13:35.000000000 +0200 +++ new/Pebble-5.0.3/test/test_process_pool_forkserver.py 2022-11-15 21:56:07.000000000 +0100 @@ -104,7 +104,7 @@ def pebble_function(): with ProcessPool(max_workers=1) as pool: - f = pool.submit(function, None, 1) + f = pool.schedule(function, args=[1]) return f.result() @@ -130,7 +130,8 @@ def test_process_pool_single_future(self): """Process Pool Forkserver single future.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) self.assertEqual(future.result(), 2) def test_process_pool_multiple_futures(self): @@ -138,13 +139,14 @@ futures = [] with ProcessPool(max_workers=1, context=mp_context) as pool: for _ in range(5): - futures.append(pool.submit(function, None, 1)) + futures.append(pool.schedule(function, args=[1])) self.assertEqual(sum([f.result() for f in futures]), 5) def test_process_pool_callback(self): """Process Pool Forkserver result is forwarded to the callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule( + function, args=[1], kwargs={'keyword_argument': 1}) future.add_done_callback(self.callback) self.event.wait() self.assertEqual(self.result, 2) @@ -152,13 +154,13 @@ def test_process_pool_error(self): """Process Pool Forkserver errors are raised by future get.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) self.assertRaises(Exception, future.result) def test_process_pool_error_callback(self): """Process Pool Forkserver errors are forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) future.add_done_callback(self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, Exception)) @@ -167,26 +169,26 @@ """Process Pool Forkserver task pickling errors are raised by future.result.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, threading.Lock()) + future = pool.schedule(function, args=[threading.Lock()]) self.assertRaises((pickle.PicklingError, TypeError), future.result) def test_process_pool_pickling_error_result(self): """Process Pool Forkserver result pickling errors are raised by future.result.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pickle_error_function, None) + future = pool.schedule(pickle_error_function) self.assertRaises((pickle.PicklingError, TypeError), future.result) def test_process_pool_timeout(self): """Process Pool Forkserver future raises TimeoutError if so.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, 0.1) + future = pool.schedule(long_function, timeout=0.1) self.assertRaises(TimeoutError, future.result) def test_process_pool_timeout_callback(self): """Process Pool Forkserver TimeoutError is forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, 0.1) + future = pool.schedule(long_function, timeout=0.1) future.add_done_callback(self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, TimeoutError)) @@ -194,7 +196,7 @@ def test_process_pool_cancel(self): """Process Pool Forkserver future raises CancelledError if so.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, None) + future = pool.schedule(long_function) time.sleep(0.1) # let the process pick up the task self.assertTrue(future.cancel()) self.assertRaises(CancelledError, future.result) @@ -202,7 +204,7 @@ def test_process_pool_cancel_callback(self): """Process Pool Forkserver CancelledError is forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, None) + future = pool.schedule(long_function) future.add_done_callback(self.callback) time.sleep(0.1) # let the process pick up the task self.assertTrue(future.cancel()) @@ -215,7 +217,7 @@ futures = [] with ProcessPool(max_workers=2, context=mp_context) as pool: for _ in range(0, 5): - futures.append(pool.submit(pid_function, None)) + futures.append(pool.schedule(pid_function)) self.assertEqual(len(set([f.result() for f in futures])), 2) def test_process_pool_future_limit(self): @@ -223,31 +225,31 @@ futures = [] with ProcessPool(max_workers=1, max_tasks=2, context=mp_context) as pool: for _ in range(0, 4): - futures.append(pool.submit(pid_function, None)) + futures.append(pool.schedule(pid_function)) self.assertEqual(len(set([f.result() for f in futures])), 2) def test_process_pool_stop_timeout(self): """Process Pool Forkserver workers are stopped if future timeout.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future1 = pool.submit(pid_function, None) - pool.submit(long_function, 0.1) - future2 = pool.submit(pid_function, None) + future1 = pool.schedule(pid_function) + pool.schedule(long_function, timeout=0.1) + future2 = pool.schedule(pid_function) self.assertNotEqual(future1.result(), future2.result()) def test_process_pool_stop_cancel(self): """Process Pool Forkserver workers are stopped if future is cancelled.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future1 = pool.submit(pid_function, None) - cancel_future = pool.submit(long_function, None) + future1 = pool.schedule(pid_function) + cancel_future = pool.schedule(long_function) time.sleep(0.1) # let the process pick up the task cancel_future.cancel() - future2 = pool.submit(pid_function, None) + future2 = pool.schedule(pid_function) self.assertNotEqual(future1.result(), future2.result()) def test_process_pool_initializer(self): """Process Pool Forkserver initializer is correctly run.""" with ProcessPool(initializer=initializer, initargs=[1], context=mp_context) as pool: - future = pool.submit(initializer_function, None) + future = pool.schedule(initializer_function) self.assertEqual(future.result(), 1) def test_process_pool_broken_initializer(self): @@ -256,18 +258,18 @@ with ProcessPool(initializer=broken_initializer, context=mp_context) as pool: pool.active time.sleep(1) - pool.submit(function, None) + pool.schedule(function) def test_process_pool_running(self): """Process Pool Forkserver is active if a future is scheduled.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertTrue(pool.active) def test_process_pool_stopped(self): """Process Pool Forkserver is not active once stopped.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertFalse(pool.active) def test_process_pool_close_futures(self): @@ -275,7 +277,7 @@ futures = [] pool = ProcessPool(max_workers=1, context=mp_context) for index in range(10): - futures.append(pool.submit(function, None, index)) + futures.append(pool.schedule(function, args=[index])) pool.close() pool.join() map(self.assertTrue, [f.done() for f in futures]) @@ -283,7 +285,7 @@ def test_process_pool_close_stopped(self): """Process Pool Forkserver is stopped after close.""" pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.close() pool.join() self.assertFalse(pool.active) @@ -293,7 +295,7 @@ futures = [] pool = ProcessPool(max_workers=1, context=mp_context) for index in range(10): - futures.append(pool.submit(function, None, index)) + futures.append(pool.schedule(function, args=[index])) pool.stop() pool.join() self.assertTrue(len([f for f in futures if not f.done()]) > 0) @@ -301,7 +303,7 @@ def test_process_pool_stop_stopped(self): """Process Pool Forkserver is stopped after stop.""" pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertFalse(pool.active) @@ -312,12 +314,12 @@ def stop_pool_callback(_): pool.stop() - future = pool.submit(function, None, 1) + future = pool.schedule(function, args=[1]) future.add_done_callback(stop_pool_callback) with self.assertRaises(RuntimeError): for index in range(10): time.sleep(0.1) - pool.submit(long_function, None, index) + pool.schedule(long_function, args=[index]) self.assertFalse(pool.active) @@ -326,7 +328,8 @@ data = "a" * 1098 * 1024 * 100 # 100 Mb with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, data, keyword_argument='') + future = pool.schedule( + function, args=[data], kwargs={'keyword_argument': ''}) self.assertEqual(data, future.result()) @@ -334,7 +337,7 @@ """Process Pool Forkserver is stopped if large data is sent on the channel.""" data = "a" * 1098 * 1024 * 100 # 100 Mb pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, data) + pool.schedule(function, args=[data]) time.sleep(1) pool.stop() pool.join() @@ -344,7 +347,7 @@ def test_process_pool_join_workers(self): """Process Pool Forkserver no worker is running after join.""" pool = ProcessPool(max_workers=4, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertEqual(len(pool._pool_manager.worker_manager.workers), 0) @@ -352,14 +355,14 @@ def test_process_pool_join_running(self): """Process Pool Forkserver RuntimeError is raised if active pool joined.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertRaises(RuntimeError, pool.join) def test_process_pool_join_futures_timeout(self): """Process Pool Forkserver TimeoutError is raised if join on long tasks.""" pool = ProcessPool(max_workers=1, context=mp_context) for _ in range(2): - pool.submit(long_function, None) + pool.schedule(long_function) pool.close() self.assertRaises(TimeoutError, pool.join, 0.4) pool.stop() @@ -368,35 +371,38 @@ def test_process_pool_callback_error(self): """Process Pool Forkserver does not stop if error in callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) future.add_done_callback(self.callback) # sleep enough to ensure callback is run time.sleep(0.1) - pool.submit(function, None, 1, keyword_argument=1) + pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) def test_process_pool_exception_isolated(self): """Process Pool Forkserver an Exception does not affect other futures.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) try: future.result() except Exception: pass - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) self.assertEqual(future.result(), 2) @unittest.skipIf(os.name == 'nt', "Test won't run on Windows'.") def test_process_pool_ignoring_sigterm(self): """Process Pool Forkserver ignored SIGTERM signal are handled on Unix.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(sigterm_function, 0.2) + future = pool.schedule(sigterm_function, timeout=0.2) with self.assertRaises(TimeoutError): future.result() def test_process_pool_expired_worker(self): """Process Pool Forkserver unexpect death of worker raises ProcessExpired.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(suicide_function, None) + future = pool.schedule(suicide_function) self.assertRaises(ProcessExpired, future.result) def test_process_pool_map(self): @@ -532,19 +538,19 @@ def test_process_pool_child_process(self): """Process Pool Forkserver worker starts process.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(process_function, None) + future = pool.schedule(process_function) self.assertEqual(future.result(), 1) def test_process_pool_child_pool(self): """Process Pool Forkserver worker starts multiprocessing.Pool.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pool_function, None) + future = pool.schedule(pool_function) self.assertEqual(future.result(), 1) def test_process_pool_child_pebble(self): """Process Pool Forkserver worker starts pebble.ProcessPool.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pebble_function, None) + future = pool.schedule(pebble_function) self.assertEqual(future.result(), 1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/test/test_process_pool_spawn.py new/Pebble-5.0.3/test/test_process_pool_spawn.py --- old/Pebble-5.0.2/test/test_process_pool_spawn.py 2022-09-12 20:21:15.000000000 +0200 +++ new/Pebble-5.0.3/test/test_process_pool_spawn.py 2022-11-15 21:56:07.000000000 +0100 @@ -102,7 +102,7 @@ def pebble_function(): with ProcessPool(max_workers=1) as pool: - f = pool.submit(function, None, 1) + f = pool.schedule(function, args=[1]) return f.result() @@ -128,7 +128,8 @@ def test_process_pool_single_future(self): """Process Pool Spawn single future.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) self.assertEqual(future.result(), 2) def test_process_pool_multiple_futures(self): @@ -136,13 +137,14 @@ futures = [] with ProcessPool(max_workers=1, context=mp_context) as pool: for _ in range(5): - futures.append(pool.submit(function, None, 1)) + futures.append(pool.schedule(function, args=[1])) self.assertEqual(sum([f.result() for f in futures]), 5) def test_process_pool_callback(self): """Process Pool Spawn result is forwarded to the callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule( + function, args=[1], kwargs={'keyword_argument': 1}) future.add_done_callback(self.callback) self.event.wait() self.assertEqual(self.result, 2) @@ -150,13 +152,13 @@ def test_process_pool_error(self): """Process Pool Spawn errors are raised by future get.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) self.assertRaises(Exception, future.result) def test_process_pool_error_callback(self): """Process Pool Spawn errors are forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) future.add_done_callback(self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, Exception)) @@ -165,26 +167,26 @@ """Process Pool Spawn task pickling errors are raised by future.result.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, threading.Lock()) + future = pool.schedule(function, args=[threading.Lock()]) self.assertRaises((pickle.PicklingError, TypeError), future.result) def test_process_pool_pickling_error_result(self): """Process Pool Spawn result pickling errors are raised by future.result.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pickle_error_function, None) + future = pool.schedule(pickle_error_function) self.assertRaises((pickle.PicklingError, TypeError), future.result) def test_process_pool_timeout(self): """Process Pool Spawn future raises TimeoutError if so.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, 0.1) + future = pool.schedule(long_function, timeout=0.1) self.assertRaises(TimeoutError, future.result) def test_process_pool_timeout_callback(self): """Process Pool Spawn TimeoutError is forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, 0.1) + future = pool.schedule(long_function, timeout=0.1) future.add_done_callback(self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, TimeoutError)) @@ -192,7 +194,7 @@ def test_process_pool_cancel(self): """Process Pool Spawn future raises CancelledError if so.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, None) + future = pool.schedule(long_function) time.sleep(0.1) # let the process pick up the task self.assertTrue(future.cancel()) self.assertRaises(CancelledError, future.result) @@ -200,7 +202,7 @@ def test_process_pool_cancel_callback(self): """Process Pool Spawn CancelledError is forwarded to callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(long_function, None) + future = pool.schedule(long_function) future.add_done_callback(self.callback) time.sleep(0.1) # let the process pick up the task self.assertTrue(future.cancel()) @@ -213,7 +215,7 @@ futures = [] with ProcessPool(max_workers=2, context=mp_context) as pool: for _ in range(0, 5): - futures.append(pool.submit(pid_function, None)) + futures.append(pool.schedule(pid_function)) self.assertEqual(len(set([f.result() for f in futures])), 2) def test_process_pool_future_limit(self): @@ -221,31 +223,31 @@ futures = [] with ProcessPool(max_workers=1, max_tasks=2, context=mp_context) as pool: for _ in range(0, 4): - futures.append(pool.submit(pid_function, None)) + futures.append(pool.schedule(pid_function)) self.assertEqual(len(set([f.result() for f in futures])), 2) def test_process_pool_stop_timeout(self): """Process Pool Spawn workers are stopped if future timeout.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future1 = pool.submit(pid_function, None) - pool.submit(long_function, 0.1) - future2 = pool.submit(pid_function, None) + future1 = pool.schedule(pid_function) + pool.schedule(long_function, timeout=0.1) + future2 = pool.schedule(pid_function) self.assertNotEqual(future1.result(), future2.result()) def test_process_pool_stop_cancel(self): """Process Pool Spawn workers are stopped if future is cancelled.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future1 = pool.submit(pid_function, None) - cancel_future = pool.submit(long_function, None) + future1 = pool.schedule(pid_function) + cancel_future = pool.schedule(long_function) time.sleep(0.1) # let the process pick up the task cancel_future.cancel() - future2 = pool.submit(pid_function, None) + future2 = pool.schedule(pid_function) self.assertNotEqual(future1.result(), future2.result()) def test_process_pool_initializer(self): """Process Pool Spawn initializer is correctly run.""" with ProcessPool(initializer=initializer, initargs=[1], context=mp_context) as pool: - future = pool.submit(initializer_function, None) + future = pool.schedule(initializer_function) self.assertEqual(future.result(), 1) def test_process_pool_broken_initializer(self): @@ -254,18 +256,18 @@ with ProcessPool(initializer=broken_initializer, context=mp_context) as pool: pool.active time.sleep(2) - pool.submit(function, None) + pool.schedule(function) def test_process_pool_running(self): """Process Pool Spawn is active if a future is scheduled.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertTrue(pool.active) def test_process_pool_stopped(self): """Process Pool Spawn is not active once stopped.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertFalse(pool.active) def test_process_pool_close_futures(self): @@ -273,7 +275,7 @@ futures = [] pool = ProcessPool(max_workers=1, context=mp_context) for index in range(10): - futures.append(pool.submit(function, None, index)) + futures.append(pool.schedule(function, args=[index])) pool.close() pool.join() map(self.assertTrue, [f.done() for f in futures]) @@ -281,7 +283,7 @@ def test_process_pool_close_stopped(self): """Process Pool Spawn is stopped after close.""" pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.close() pool.join() self.assertFalse(pool.active) @@ -291,7 +293,7 @@ futures = [] pool = ProcessPool(max_workers=1, context=mp_context) for index in range(10): - futures.append(pool.submit(function, None, index)) + futures.append(pool.schedule(function, args=[index])) pool.stop() pool.join() self.assertTrue(len([f for f in futures if not f.done()]) > 0) @@ -299,7 +301,7 @@ def test_process_pool_stop_stopped(self): """Process Pool Spawn is stopped after stop.""" pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertFalse(pool.active) @@ -310,12 +312,12 @@ def stop_pool_callback(_): pool.stop() - future = pool.submit(function, None, 1) + future = pool.schedule(function, args=[1]) future.add_done_callback(stop_pool_callback) with self.assertRaises(RuntimeError): for index in range(10): time.sleep(0.1) - pool.submit(long_function, None, index) + pool.schedule(long_function, args=[index]) self.assertFalse(pool.active) @@ -324,7 +326,8 @@ data = "a" * 1098 * 1024 * 100 # 100 Mb with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, data, keyword_argument='') + future = pool.schedule( + function, args=[data], kwargs={'keyword_argument': ''}) self.assertEqual(data, future.result()) @@ -332,7 +335,7 @@ """Process Pool Spawn stopped if large data is sent on the channel.""" data = "a" * 1098 * 1024 * 100 # 100 Mb pool = ProcessPool(max_workers=1, context=mp_context) - pool.submit(function, None, data) + pool.schedule(function, args=[data]) time.sleep(1) pool.stop() pool.join() @@ -342,7 +345,7 @@ def test_process_pool_join_workers(self): """Process Pool Spawn no worker is running after join.""" pool = ProcessPool(max_workers=4, context=mp_context) - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertEqual(len(pool._pool_manager.worker_manager.workers), 0) @@ -350,14 +353,14 @@ def test_process_pool_join_running(self): """Process Pool Spawn RuntimeError is raised if active pool joined.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - pool.submit(function, None, 1) + pool.schedule(function, args=[1]) self.assertRaises(RuntimeError, pool.join) def test_process_pool_join_futures_timeout(self): """Process Pool Spawn TimeoutError is raised if join on long tasks.""" pool = ProcessPool(max_workers=1, context=mp_context) for _ in range(2): - pool.submit(long_function, None) + pool.schedule(long_function) pool.close() self.assertRaises(TimeoutError, pool.join, 0.4) pool.stop() @@ -366,35 +369,38 @@ def test_process_pool_callback_error(self): """Process Pool Spawn does not stop if error in callback.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) future.add_done_callback(self.callback) # sleep enough to ensure callback is run time.sleep(0.1) - pool.submit(function, None, 1, keyword_argument=1) + pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) def test_process_pool_exception_isolated(self): """Process Pool Spawn an Exception does not affect other futures.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(error_function, None) + future = pool.schedule(error_function) try: future.result() except Exception: pass - future = pool.submit(function, None, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) self.assertEqual(future.result(), 2) @unittest.skipIf(os.name == 'nt', "Test won't run on Windows'.") def test_process_pool_ignoring_sigterm(self): """Process Pool Spawn ignored SIGTERM signal are handled on Unix.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(sigterm_function, 0.2) + future = pool.schedule(sigterm_function, timeout=0.2) with self.assertRaises(TimeoutError): future.result() def test_process_pool_expired_worker(self): """Process Pool Spawn unexpect death of worker raises ProcessExpired.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(suicide_function, None) + future = pool.schedule(suicide_function) self.assertRaises(ProcessExpired, future.result) def test_process_pool_map(self): @@ -530,19 +536,19 @@ def test_process_pool_child_process(self): """Process Pool Spawn worker starts process.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(process_function, None) + future = pool.schedule(process_function) self.assertEqual(future.result(), 1) def test_process_pool_child_pool(self): """Process Pool Spawn worker starts multiprocessing.Pool.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pool_function, None) + future = pool.schedule(pool_function) self.assertEqual(future.result(), 1) def test_process_pool_child_pebble(self): """Process Pool Spawn worker starts pebble.ProcessPool.""" with ProcessPool(max_workers=1, context=mp_context) as pool: - future = pool.submit(pebble_function, None) + future = pool.schedule(pebble_function) self.assertEqual(future.result(), 1) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/Pebble-5.0.2/test/test_thread_pool.py new/Pebble-5.0.3/test/test_thread_pool.py --- old/Pebble-5.0.2/test/test_thread_pool.py 2022-09-12 20:11:18.000000000 +0200 +++ new/Pebble-5.0.3/test/test_thread_pool.py 2022-11-15 21:56:07.000000000 +0100 @@ -68,7 +68,8 @@ def test_thread_pool_single_future(self): """Thread Pool single future.""" with ThreadPool(max_workers=1) as pool: - future = pool.submit(function, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) self.assertEqual(future.result(), 2) def test_thread_pool_multiple_futures(self): @@ -76,13 +77,14 @@ futures = [] with ThreadPool(max_workers=1) as pool: for _ in range(5): - futures.append(pool.submit(function, 1)) + futures.append(pool.schedule(function, args=[1])) self.assertEqual(sum([t.result() for t in futures]), 5) def test_thread_pool_callback(self): """Thread Pool results are forwarded to the callback.""" with ThreadPool(max_workers=1) as pool: - future = pool.submit(function, 1, keyword_argument=1) + future = pool.schedule( + function, args=[1], kwargs={'keyword_argument': 1}) future.add_done_callback(self.callback) self.event.wait() @@ -91,14 +93,14 @@ def test_thread_pool_error(self): """Thread Pool errors are raised by future get.""" with ThreadPool(max_workers=1) as pool: - future = pool.submit(error_function) + future = pool.schedule(error_function) with self.assertRaises(Exception): future.result() def test_thread_pool_error_callback(self): """Thread Pool errors are forwarded to callback.""" with ThreadPool(max_workers=1) as pool: - future = pool.submit(error_function) + future = pool.schedule(error_function) future.add_done_callback(self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, Exception)) @@ -106,8 +108,8 @@ def test_thread_pool_cancel_callback(self): """Thread Pool FutureCancelled is forwarded to callback.""" with ThreadPool(max_workers=1) as pool: - pool.submit(long_function) - future = pool.submit(long_function) + pool.schedule(long_function) + future = pool.schedule(long_function) future.add_done_callback(self.callback) future.cancel() self.event.wait() @@ -119,7 +121,7 @@ futures = [] with ThreadPool(max_workers=2) as pool: for _ in range(0, 5): - futures.append(pool.submit(tid_function)) + futures.append(pool.schedule(tid_function)) self.assertEqual(len(set([t.result() for t in futures])), 2) def test_thread_pool_tasks_limit(self): @@ -127,13 +129,13 @@ futures = [] with ThreadPool(max_workers=1, max_tasks=2) as pool: for _ in range(0, 4): - futures.append(pool.submit(tid_function)) + futures.append(pool.schedule(tid_function)) self.assertEqual(len(set([t.result() for t in futures])), 2) def test_thread_pool_initializer(self): """Thread Pool initializer is correctly run.""" with ThreadPool(initializer=initializer, initargs=[1]) as pool: - future = pool.submit(initializer_function) + future = pool.schedule(initializer_function) self.assertEqual(future.result(), 1) def test_thread_pool_broken_initializer(self): @@ -142,18 +144,18 @@ with ThreadPool(initializer=broken_initializer) as pool: pool.active time.sleep(0.3) - pool.submit(function) + pool.schedule(function) def test_thread_pool_running(self): """Thread Pool is active if a future is scheduled.""" with ThreadPool(max_workers=1) as pool: - pool.submit(function, 1) + pool.schedule(function, args=[1]) self.assertTrue(pool.active) def test_thread_pool_stopped(self): """Thread Pool is not active once stopped.""" with ThreadPool(max_workers=1) as pool: - pool.submit(function, 1) + pool.schedule(function, args=[1]) self.assertFalse(pool.active) def test_thread_pool_close_futures(self): @@ -161,7 +163,7 @@ futures = [] pool = ThreadPool(max_workers=1) for index in range(10): - futures.append(pool.submit(function, index)) + futures.append(pool.schedule(function, args=[index])) pool.close() pool.join() map(self.assertTrue, [t.done() for t in futures]) @@ -169,7 +171,7 @@ def test_thread_pool_close_stopped(self): """Thread Pool is stopped after close.""" pool = ThreadPool(max_workers=1) - pool.submit(function, 1) + pool.schedule(function, args=[1]) pool.close() pool.join() self.assertFalse(pool.active) @@ -179,7 +181,7 @@ futures = [] pool = ThreadPool(max_workers=1) for index in range(10): - futures.append(pool.submit(long_function, index)) + futures.append(pool.schedule(long_function, args=[index])) pool.stop() pool.join() self.assertTrue(len([t for t in futures if not t.done()]) > 0) @@ -187,7 +189,7 @@ def test_thread_pool_stop_stopped(self): """Thread Pool is stopped after stop.""" pool = ThreadPool(max_workers=1) - pool.submit(function, 1) + pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertFalse(pool.active) @@ -198,7 +200,7 @@ def function(): pool.stop() - pool.submit(function) + pool.schedule(function) self.assertFalse(pool.active) @@ -208,19 +210,19 @@ def stop_pool_callback(_): pool.stop() - future = pool.submit(function, 1) + future = pool.schedule(function, args=[1]) future.add_done_callback(stop_pool_callback) with self.assertRaises(RuntimeError): for index in range(10): time.sleep(0.1) - pool.submit(long_function, index) + pool.schedule(long_function, args=[index]) self.assertFalse(pool.active) def test_thread_pool_join_workers(self): """Thread Pool no worker is running after join.""" pool = ThreadPool(max_workers=4) - pool.submit(function, 1) + pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertEqual(len(pool._pool_manager.workers), 0) @@ -228,14 +230,14 @@ def test_thread_pool_join_running(self): """Thread Pool RuntimeError is raised if active pool joined.""" with ThreadPool(max_workers=1) as pool: - pool.submit(function, 1) + pool.schedule(function, args=[1]) self.assertRaises(RuntimeError, pool.join) def test_thread_pool_join_futures_timeout(self): """Thread Pool TimeoutError is raised if join on long futures.""" pool = ThreadPool(max_workers=1) for _ in range(2): - pool.submit(long_function) + pool.schedule(long_function) pool.close() self.assertRaises(TimeoutError, pool.join, 0.4) pool.stop() @@ -244,12 +246,13 @@ def test_thread_pool_exception_isolated(self): """Thread Pool an Exception does not affect other futures.""" with ThreadPool(max_workers=1) as pool: - future = pool.submit(error_function) + future = pool.schedule(error_function) try: future.result() except: pass - future = pool.submit(function, 1, keyword_argument=1) + future = pool.schedule(function, args=[1], + kwargs={'keyword_argument': 1}) self.assertEqual(future.result(), 2) def test_thread_pool_map(self):