I'd like to propose an improvement to `concurrent.futures`. The library's
ThreadPoolExecutor and ProcessPoolExecutor are excellent tools, but there
is currently no mechanism for configuring which type of executor you want.
Also, there is no duck-typed class that behaves like an executor, but does
its processing in serial. Often times a develop will want to run a task in
parallel, but depending on the environment they may want to disable
threading or process execution. To address this I use a utility called a
`SerialExecutor` which shares an API with
ThreadPoolExecutor/ProcessPoolExecutor but executes processes sequentially
in the same python thread:


```python
import concurrent.futures

class SerialFuture( concurrent.futures.Future):
    """
    Non-threading / multiprocessing version of future for drop in
compatibility
    with concurrent.futures.
    """
    def __init__(self, func, *args, **kw):
        super(SerialFuture, self).__init__()
        self.func = func
        self.args = args
        self.kw = kw
        # self._condition = FakeCondition()
        self._run_count = 0
        # fake being finished to cause __get_result to be called
        self._state = concurrent.futures._base.FINISHED

    def _run(self):
        result = self.func(*self.args, **self.kw)
        self.set_result(result)
        self._run_count += 1

    def set_result(self, result):
        """
        Overrides the implementation to revert to pre python3.8 behavior
        """
        with self._condition:
            self._result = result
            self._state = concurrent.futures._base.FINISHED
            for waiter in self._waiters:
                waiter.add_result(self)
            self._condition.notify_all()
        self._invoke_callbacks()

    def _Future__get_result(self):
        # overrides private __getresult method
        if not self._run_count:
            self._run()
        return self._result


class SerialExecutor(object):
    """
    Implements the concurrent.futures API around a single-threaded backend

    Example:
        >>> with SerialExecutor() as executor:
        >>>     futures = []
        >>>     for i in range(100):
        >>>         f = executor.submit(lambda x: x + 1, i)
        >>>         futures.append(f)
        >>>     for f in concurrent.futures.as_completed(futures):
        >>>         assert f.result() > 0
        >>>     for i, f in enumerate(futures):
        >>>         assert i + 1 == f.result()
        """
    def __enter__(self):
        return self

    def __exit__(self, ex_type, ex_value, tb):
        pass

    def submit(self, func, *args, **kw):
        return SerialFuture(func, *args, **kw)

    def shutdown(self):
        pass

```

In order to make it easy to choose the type of parallel (or serial) backend
with minimal code changes I use the following "Executor" wrapper class
(although if this was integrated into concurrent.futures the name would
need to change to something better):

```python
class Executor(object):
    """
    Wrapper around a specific executor.

    Abstracts Serial, Thread, and Process Executor via arguments.

    Args:
        mode (str, default='thread'): either thread, serial, or process
        max_workers (int, default=0): number of workers. If 0, serial is
forced.
    """

    def __init__(self, mode='thread', max_workers=0):
        from concurrent import futures
        if mode == 'serial' or max_workers == 0:
            backend = SerialExecutor()
        elif mode == 'thread':
            backend = futures.ThreadPoolExecutor(max_workers=max_workers)
        elif mode == 'process':
            backend = futures.ProcessPoolExecutor(max_workers=max_workers)
        else:
            raise KeyError(mode)
        self.backend = backend

    def __enter__(self):
        return self.backend.__enter__()

    def __exit__(self, ex_type, ex_value, tb):
        return self.backend.__exit__(ex_type, ex_value, tb)

    def submit(self, func, *args, **kw):
        return self.backend.submit(func, *args, **kw)

    def shutdown(self):
        return self.backend.shutdown()
```

So in summary, I'm proposing to add a SerialExecutor and SerialFuture class
as an alternative to the ThreadPool / ProcessPool executors, and I'm also
advocating for some sort of "ParamatrizedExecutor", where the user can
construct it in "thread", "process", or "serial" model.

--
-Jon
_______________________________________________
Python-ideas mailing list -- python-ideas@python.org
To unsubscribe send an email to python-ideas-le...@python.org
https://mail.python.org/mailman3/lists/python-ideas.python.org/
Message archived at 
https://mail.python.org/archives/list/python-ideas@python.org/message/LMTQ2AI6A7UXEFVHRGHKWD33H24FGM6G/
Code of Conduct: http://python.org/psf/codeofconduct/

Reply via email to