Title: [278187] trunk/Tools
Revision
278187
Author
jbed...@apple.com
Date
2021-05-27 19:26:59 -0700 (Thu, 27 May 2021)

Log Message

[webkitcorey] Gracefully handle CNTRL-C in TaskPool
https://bugs.webkit.org/show_bug.cgi?id=226238
<rdar://problem/78472148>

Reviewed by Dewei Zhu.

* Scripts/libraries/webkitcorepy/setup.py: Bump version.
* Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py: Ditto.
* Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py:
(_BiDirectionalQueue.close): Tear-down queue objects while suppressing any logging.
(_Process.handler): Both SIGTERM and SIGINT signals should stop worker processes.
(_Process.main): Add SIGINT handler, explicitly close queue.
(TaskPool.__init__): Defer worker and queue construction to context manager so that we
do not have an instantiated but invalid queue if pipes are broken by children processes.
(TaskPool.__enter__): Construct worker processes.
(TaskPool.do): Only catch Queue.Empty exception.
(TaskPool.__exit__): Explicitly close queue, reset workers and queue.

Modified Paths

Diff

Modified: trunk/Tools/ChangeLog (278186 => 278187)


--- trunk/Tools/ChangeLog	2021-05-28 02:08:19 UTC (rev 278186)
+++ trunk/Tools/ChangeLog	2021-05-28 02:26:59 UTC (rev 278187)
@@ -1,3 +1,23 @@
+2021-05-27  Jonathan Bedard  <jbed...@apple.com>
+
+        [webkitcorey] Gracefully handle CNTRL-C in TaskPool
+        https://bugs.webkit.org/show_bug.cgi?id=226238
+        <rdar://problem/78472148>
+
+        Reviewed by Dewei Zhu.
+
+        * Scripts/libraries/webkitcorepy/setup.py: Bump version.
+        * Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py: Ditto.
+        * Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py:
+        (_BiDirectionalQueue.close): Tear-down queue objects while suppressing any logging.
+        (_Process.handler): Both SIGTERM and SIGINT signals should stop worker processes.
+        (_Process.main): Add SIGINT handler, explicitly close queue.
+        (TaskPool.__init__): Defer worker and queue construction to context manager so that we
+        do not have an instantiated but invalid queue if pipes are broken by children processes.
+        (TaskPool.__enter__): Construct worker processes.
+        (TaskPool.do): Only catch Queue.Empty exception.
+        (TaskPool.__exit__): Explicitly close queue, reset workers and queue.
+
 2021-05-27  Darin Adler  <da...@apple.com>
 
         Next step toward using std::optional directly instead of through WTF::Optional typedef

Modified: trunk/Tools/Scripts/libraries/webkitcorepy/setup.py (278186 => 278187)


--- trunk/Tools/Scripts/libraries/webkitcorepy/setup.py	2021-05-28 02:08:19 UTC (rev 278186)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/setup.py	2021-05-28 02:26:59 UTC (rev 278187)
@@ -30,7 +30,7 @@
 
 setup(
     name='webkitcorepy',
-    version='0.5.14',
+    version='0.5.15',
     description='Library containing various Python support classes and functions.',
     long_description=readme(),
     classifiers=[

Modified: trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py (278186 => 278187)


--- trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py	2021-05-28 02:08:19 UTC (rev 278186)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/__init__.py	2021-05-28 02:26:59 UTC (rev 278187)
@@ -37,7 +37,7 @@
 from webkitcorepy.task_pool import TaskPool
 from webkitcorepy.credentials import credentials
 
-version = Version(0, 5, 14)
+version = Version(0, 5, 15)
 
 from webkitcorepy.autoinstall import Package, AutoInstall
 if sys.version_info > (3, 0):

Modified: trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py (278186 => 278187)


--- trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py	2021-05-28 02:08:19 UTC (rev 278186)
+++ trunk/Tools/Scripts/libraries/webkitcorepy/webkitcorepy/task_pool.py	2021-05-28 02:26:59 UTC (rev 278187)
@@ -27,6 +27,11 @@
 import signal
 import sys
 
+if sys.version_info < (3, 0):
+    import Queue
+else:
+    import queue as Queue
+
 from webkitcorepy import OutputCapture, Timeout, log
 
 
@@ -146,7 +151,14 @@
                 return self.incoming.get(timeout=difference)
             return self.incoming.get()
 
+    def close(self):
+        with OutputCapture():
+            self.outgoing.close()
+            self.incoming.close()
+            self.outgoing.join_thread()
+            self.incoming.join_thread()
 
+
 class _Process(object):
     name = None
     working = False
@@ -228,7 +240,7 @@
 
     @classmethod
     def handler(cls, value, _):
-        if value == getattr(signal, 'SIGTERM'):
+        if value in (getattr(signal, 'SIGTERM'), getattr(signal, 'SIGINT')):
             cls.working = False
 
     @classmethod
@@ -246,6 +258,8 @@
 
         if getattr(signal, 'SIGTERM'):
             signal.signal(signal.SIGTERM, cls.handler)
+        if getattr(signal, 'SIGINT'):
+            signal.signal(signal.SIGINT, cls.handler)
 
         logger = logging.getLogger()
         for handler in logger.handlers:
@@ -280,6 +294,7 @@
                 sys.stdout.flush()
                 sys.stderr.flush()
                 queue.send(_State(_State.STOPPING))
+                cls.queue.close()
                 cls.queue = None
 
 
@@ -309,21 +324,18 @@
         name = name or 'worker'
         if name == self.Process.name:
             raise ValueError("Parent process is already named {}".format(name))
+        self.name = name
 
         if workers < 1:
             raise ValueError('TaskPool requires positive number of workers')
 
-        self.queue = self.BiDirectionalQueue()
+        self.queue = None
+        self.workers = []
 
-        self.workers = [multiprocessing.Process(
-            target=self.Process.main,
-            args=(
-                '{}/{}'.format(name, count), logging.getLogger().getEffectiveLevel(),
-                setup, setupargs, setupkwargs,
-                self.BiDirectionalQueue(outgoing=self.queue.incoming, incoming=self.queue.outgoing),
-                teardown, teardownargs, teardownkwargs,
-            ),
-        ) for count in range(workers)]
+        self._setup_args = (setup, setupargs, setupkwargs)
+        self._teardown_args = (teardown, teardownargs, teardownkwargs)
+        self._num_workers = int(workers)
+
         self._started = 0
 
         self.callbacks = {}
@@ -334,6 +346,17 @@
     def __enter__(self):
         from mock import patch
 
+        self.queue = self.BiDirectionalQueue()
+        self.workers = [multiprocessing.Process(
+            target=self.Process.main,
+            args=(
+                '{}/{}'.format(self.name, count), logging.getLogger().getEffectiveLevel(),
+                self._setup_args[0], self._setup_args[1], self._setup_args[2],
+                self.BiDirectionalQueue(outgoing=self.queue.incoming, incoming=self.queue.outgoing),
+                self._teardown_args[0], self._teardown_args[1], self._teardown_args[2],
+            ),
+        ) for count in range(self._num_workers)]
+
         with Timeout(seconds=10, patch=False, handler=self.Exception('Failed to start all workers')):
             for worker in self.workers:
                 worker.start()
@@ -353,7 +376,7 @@
             while True:
                 try:
                     self.queue.receive(blocking=False)(self)
-                except Exception:
+                except Queue.Empty:
                     break
 
     def wait(self):
@@ -393,3 +416,7 @@
                     os.kill(worker.pid, signal.SIGKILL)
                 else:
                     worker.terminate()
+
+            self.queue.close()
+            self.queue = None
+            self.workers = []
_______________________________________________
webkit-changes mailing list
webkit-changes@lists.webkit.org
https://lists.webkit.org/mailman/listinfo/webkit-changes

Reply via email to