On Thu, 18 Jun 2020 00:35:48 -0700 Zac Medico <zmed...@gentoo.org> wrote:
> In the coroutine finally clause, do not call remove_writer in cases > where fd has been closed and then re-allocated to a concurrent > coroutine as in bug 716636. > > Also, assume that the caller will put the file in non-blocking mode > and close the file when done, so that this function is suitable for > use within a loop. > > Bug: https://bugs.gentoo.org/728580 > Signed-off-by: Zac Medico <zmed...@gentoo.org> > --- > lib/portage/util/futures/_asyncio/process.py | 11 ++++- > lib/portage/util/futures/_asyncio/streams.py | 50 > ++++++++++---------- 2 files changed, 33 insertions(+), 28 > deletions(-) > > diff --git a/lib/portage/util/futures/_asyncio/process.py > b/lib/portage/util/futures/_asyncio/process.py index > 020164c9b..2d3e9b0fd 100644 --- > a/lib/portage/util/futures/_asyncio/process.py +++ > b/lib/portage/util/futures/_asyncio/process.py @@ -1,9 +1,12 @@ > -# Copyright 2018 Gentoo Foundation > +# Copyright 2018-2020 Gentoo Authors > # Distributed under the terms of the GNU General Public License v2 > > +import os > + > import portage > portage.proxy.lazyimport.lazyimport(globals(), > 'portage.util.futures:asyncio', > + 'portage.util.futures.unix_events:_set_nonblocking', > ) > from portage.util.futures._asyncio.streams import _reader, _writer > from portage.util.futures.compat_coroutine import coroutine, > coroutine_return @@ -59,7 +62,11 @@ class _Process(object): > if input is not None: > if self._proc.stdin is None: > raise TypeError('communicate: > expected file or int, got {}'.format(type(self._proc.stdin))) > - writer = > asyncio.ensure_future(_writer(self._proc.stdin, input), > loop=self._loop) > + stdin = self._proc.stdin > + stdin = os.fdopen(stdin, 'wb', 0) if > isinstance(stdin, int) else stdin > + _set_nonblocking(stdin.fileno()) > + writer = > asyncio.ensure_future(_writer(stdin, input, loop=self._loop), > loop=self._loop) > + writer.add_done_callback(lambda writer: > stdin.close()) > try: > yield asyncio.wait(futures + [self.wait()], > loop=self._loop) diff --git > a/lib/portage/util/futures/_asyncio/streams.py > b/lib/portage/util/futures/_asyncio/streams.py index > 650a16491..870307e1e 100644 --- > a/lib/portage/util/futures/_asyncio/streams.py +++ > b/lib/portage/util/futures/_asyncio/streams.py @@ -1,4 +1,4 @@ -# > Copyright 2018 Gentoo Foundation +# Copyright 2018-2020 Gentoo Authors > # Distributed under the terms of the GNU General Public License v2 > > import errno > @@ -8,7 +8,6 @@ import portage > portage.proxy.lazyimport.lazyimport(globals(), > '_emerge.PipeReader:PipeReader', > 'portage.util.futures:asyncio', > - 'portage.util.futures.unix_events:_set_nonblocking', > ) > from portage.util.futures.compat_coroutine import coroutine > > @@ -59,38 +58,37 @@ class _Reader(object): > @coroutine > def _writer(output_file, content, loop=None): > """ > - Asynchronously write bytes to output file, and close it when > - done. If an EnvironmentError other than EAGAIN is > encountered, > - which typically indicates that the other end of the pipe has > - close, the error is raised. This function is a coroutine. > + Asynchronously write bytes to output file. The output file is > + assumed to be in non-blocking mode. If an EnvironmentError > + other than EAGAIN is encountered, which typically indicates > that > + the other end of the pipe has closed, the error is raised. > + This function is a coroutine. > > - @param output_file: output file descriptor > - @type output_file: file or int > + @param output_file: output file > + @type output_file: file object > @param content: content to write > @type content: bytes > @param loop: asyncio.AbstractEventLoop (or compatible) > @type loop: event loop > """ > - fd = output_file if isinstance(output_file, int) else > output_file.fileno() > - _set_nonblocking(fd) > loop = asyncio._wrap_loop(loop) > - try: > - while content: > + fd = output_file.fileno() > + while content: > + try: > + content = content[os.write(fd, content):] > + except EnvironmentError as e: > + if e.errno != errno.EAGAIN: > + raise > waiter = loop.create_future() > - loop.add_writer(fd, lambda: > waiter.set_result(None)) > + loop.add_writer(fd, lambda: waiter.done() or > waiter.set_result(None)) try: > yield waiter > - while content: > - try: > - content = > content[os.write(fd, content):] > - except EnvironmentError as e: > - if e.errno == > errno.EAGAIN: > - break > - else: > - raise > finally: > - loop.remove_writer(fd) > - except GeneratorExit: > - raise > - finally: > - os.close(output_file) if isinstance(output_file, > int) else output_file.close() > + # The loop and output file may have > been closed. > + if not loop.is_closed(): > + waiter.done() or > waiter.cancel() > + # Do not call remove_writer > in cases where fd has > + # been closed and then > re-allocated to a concurrent > + # coroutine as in bug 716636. > + if not output_file.closed: > + > loop.remove_writer(fd) looks fine to me