Re: [gentoo-portage-dev] [PATCH 1/4] Add ForkExecutor (bug 649588)

2018-04-01 Thread Alec Warner
lgtm.

On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico  wrote:

> This is useful for asynchronous operations that we might
> need to cancel if they take too long, since (concurrent.
> futures.ProcessPoolExecutor tasks are not cancellable).
> This ability to cancel tasks makes this executor useful
> as an alternative to portage.exception.AlarmSignal.
>
> Also add an asyncio-compatible EventLoop.run_in_executor
> method the uses ForkExecutor as the default executor,
> which will later be used to implement the corresponding
> asyncio.AbstractEventLoop run_in_executor method.
>
> Bug: https://bugs.gentoo.org/649588
> ---
>  pym/portage/util/_eventloop/EventLoop.py  |  45 -
>  pym/portage/util/futures/executor/__init__.py |   0
>  pym/portage/util/futures/executor/fork.py | 130
> ++
>  3 files changed, 174 insertions(+), 1 deletion(-)
>  create mode 100644 pym/portage/util/futures/executor/__init__.py
>  create mode 100644 pym/portage/util/futures/executor/fork.py
>
> diff --git a/pym/portage/util/_eventloop/EventLoop.py
> b/pym/portage/util/_eventloop/EventLoop.py
> index f472a3dae..1574a6837 100644
> --- a/pym/portage/util/_eventloop/EventLoop.py
> +++ b/pym/portage/util/_eventloop/EventLoop.py
> @@ -24,6 +24,7 @@ except ImportError:
>  import portage
>  portage.proxy.lazyimport.lazyimport(globals(),
> 'portage.util.futures.futures:_EventLoopFuture',
> +   'portage.util.futures.executor.fork:ForkExecutor',
>  )
>
>  from portage import OrderedDict
> @@ -122,6 +123,7 @@ class EventLoop(object):
> self._idle_callbacks = OrderedDict()
> self._timeout_handlers = {}
> self._timeout_interval = None
> +   self._default_executor = None
>
> self._poll_obj = None
> try:
> @@ -721,6 +723,46 @@ class EventLoop(object):
> return self._handle(self.timeout_add(
> delay * 1000, self._call_soon_callback(callback,
> args)), self)
>
> +   def run_in_executor(self, executor, func, *args):
> +   """
> +   Arrange for a func to be called in the specified executor.
> +
> +   The executor argument should be an Executor instance. The
> default
> +   executor is used if executor is None.
> +
> +   Use functools.partial to pass keywords to the *func*.
> +
> +   @param executor: executor
> +   @type executor: concurrent.futures.Executor or None
> +   @param func: a function to call
> +   @type func: callable
> +   @return: a Future
> +   @rtype: asyncio.Future (or compatible)
> +   """
> +   if executor is None:
> +   executor = self._default_executor
> +   if executor is None:
> +   executor = ForkExecutor(loop=self)
> +   self._default_executor = executor
> +   return executor.submit(func, *args)
> +
> +   def close(self):
> +   """Close the event loop.
> +
> +   This clears the queues and shuts down the executor,
> +   and waits for it to finish.
> +   """
> +   executor = self._default_executor
> +   if executor is not None:
> +   self._default_executor = None
> +   executor.shutdown(wait=True)
> +
> +   if self._poll_obj is not None:
> +   close = getattr(self._poll_obj, 'close')
> +   if close is not None:
> +   close()
> +   self._poll_obj = None
> +
>
>  _can_poll_device = None
>
> @@ -782,10 +824,11 @@ class _epoll_adapter(object):
> that is associated with an epoll instance will close automatically
> when
> it is garbage collected, so it's not necessary to close it
> explicitly.
> """
> -   __slots__ = ('_epoll_obj',)
> +   __slots__ = ('_epoll_obj', 'close')
>
> def __init__(self, epoll_obj):
> self._epoll_obj = epoll_obj
> +   self.close = epoll_obj.close
>
> def register(self, fd, *args):
> self._epoll_obj.register(fd, *args)
> diff --git a/pym/portage/util/futures/executor/__init__.py
> b/pym/portage/util/futures/executor/__init__.py
> new file mode 100644
> index 0..e69de29bb
> diff --git a/pym/portage/util/futures/executor/fork.py
> b/pym/portage/util/futures/executor/fork.py
> new file mode 100644
> index 0..9cd1db2ca
> --- /dev/null
> +++ b/pym/portage/util/futures/executor/fork.py
> @@ -0,0 +1,130 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import collections
> +import functools
> +import multiprocessing
> +import os
> +import sys
> +import traceback
> +
> +from 

[gentoo-portage-dev] [PATCH 1/4] Add ForkExecutor (bug 649588)

2018-03-31 Thread Zac Medico
This is useful for asynchronous operations that we might
need to cancel if they take too long, since (concurrent.
futures.ProcessPoolExecutor tasks are not cancellable).
This ability to cancel tasks makes this executor useful
as an alternative to portage.exception.AlarmSignal.

Also add an asyncio-compatible EventLoop.run_in_executor
method the uses ForkExecutor as the default executor,
which will later be used to implement the corresponding
asyncio.AbstractEventLoop run_in_executor method.

Bug: https://bugs.gentoo.org/649588
---
 pym/portage/util/_eventloop/EventLoop.py  |  45 -
 pym/portage/util/futures/executor/__init__.py |   0
 pym/portage/util/futures/executor/fork.py | 130 ++
 3 files changed, 174 insertions(+), 1 deletion(-)
 create mode 100644 pym/portage/util/futures/executor/__init__.py
 create mode 100644 pym/portage/util/futures/executor/fork.py

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index f472a3dae..1574a6837 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -24,6 +24,7 @@ except ImportError:
 import portage
 portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.futures:_EventLoopFuture',
+   'portage.util.futures.executor.fork:ForkExecutor',
 )
 
 from portage import OrderedDict
@@ -122,6 +123,7 @@ class EventLoop(object):
self._idle_callbacks = OrderedDict()
self._timeout_handlers = {}
self._timeout_interval = None
+   self._default_executor = None
 
self._poll_obj = None
try:
@@ -721,6 +723,46 @@ class EventLoop(object):
return self._handle(self.timeout_add(
delay * 1000, self._call_soon_callback(callback, 
args)), self)
 
+   def run_in_executor(self, executor, func, *args):
+   """
+   Arrange for a func to be called in the specified executor.
+
+   The executor argument should be an Executor instance. The 
default
+   executor is used if executor is None.
+
+   Use functools.partial to pass keywords to the *func*.
+
+   @param executor: executor
+   @type executor: concurrent.futures.Executor or None
+   @param func: a function to call
+   @type func: callable
+   @return: a Future
+   @rtype: asyncio.Future (or compatible)
+   """
+   if executor is None:
+   executor = self._default_executor
+   if executor is None:
+   executor = ForkExecutor(loop=self)
+   self._default_executor = executor
+   return executor.submit(func, *args)
+
+   def close(self):
+   """Close the event loop.
+
+   This clears the queues and shuts down the executor,
+   and waits for it to finish.
+   """
+   executor = self._default_executor
+   if executor is not None:
+   self._default_executor = None
+   executor.shutdown(wait=True)
+
+   if self._poll_obj is not None:
+   close = getattr(self._poll_obj, 'close')
+   if close is not None:
+   close()
+   self._poll_obj = None
+
 
 _can_poll_device = None
 
@@ -782,10 +824,11 @@ class _epoll_adapter(object):
that is associated with an epoll instance will close automatically when
it is garbage collected, so it's not necessary to close it explicitly.
"""
-   __slots__ = ('_epoll_obj',)
+   __slots__ = ('_epoll_obj', 'close')
 
def __init__(self, epoll_obj):
self._epoll_obj = epoll_obj
+   self.close = epoll_obj.close
 
def register(self, fd, *args):
self._epoll_obj.register(fd, *args)
diff --git a/pym/portage/util/futures/executor/__init__.py 
b/pym/portage/util/futures/executor/__init__.py
new file mode 100644
index 0..e69de29bb
diff --git a/pym/portage/util/futures/executor/fork.py 
b/pym/portage/util/futures/executor/fork.py
new file mode 100644
index 0..9cd1db2ca
--- /dev/null
+++ b/pym/portage/util/futures/executor/fork.py
@@ -0,0 +1,130 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import collections
+import functools
+import multiprocessing
+import os
+import sys
+import traceback
+
+from portage.util._async.AsyncFunction import AsyncFunction
+from portage.util._eventloop.global_event_loop import global_event_loop
+
+
+class ForkExecutor(object):
+   """
+   An implementation of concurrent.futures.Executor that forks a
+   new process for each task, with support for cancellation of tasks.
+