commit: bf1505777847aface5b4cac16b955071c2915ddd Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Wed Apr 8 05:03:34 2020 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Wed Apr 8 05:29:48 2020 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=bf150577
Revert "AsyncScheduler: use async_start method" This reverts commit 8f47d3fe1190d4476ae9eebfafcebdfb1794fc05. Bug: https://bugs.gentoo.org/716636 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/portage/tests/ebuild/test_doebuild_fd_pipes.py | 8 +++-- .../tests/util/futures/test_iter_completed.py | 2 -- lib/portage/util/_async/AsyncScheduler.py | 20 ++---------- lib/portage/util/futures/iter_completed.py | 38 +++++----------------- 4 files changed, 15 insertions(+), 53 deletions(-) diff --git a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py index 50fc5fe1c..05ea24c4b 100644 --- a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py +++ b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py @@ -109,16 +109,18 @@ class DoebuildFdPipesTestCase(TestCase): output_fd: pw, }, "prev_mtimes": {}}) - producer.addStartListener(lambda producer: os.close(pw)) - # PipeReader closes pr consumer = PipeReader( input_files={"producer" : pr}) task_scheduler = TaskScheduler(iter([producer, consumer]), max_jobs=2) - loop.run_until_complete(task_scheduler.async_start()) + try: + loop.run_until_complete(task_scheduler.async_start()) + finally: + # PipeReader closes pr + os.close(pw) task_scheduler.wait() output = portage._unicode_decode( diff --git a/lib/portage/tests/util/futures/test_iter_completed.py b/lib/portage/tests/util/futures/test_iter_completed.py index 03ace915a..aa24f5685 100644 --- a/lib/portage/tests/util/futures/test_iter_completed.py +++ b/lib/portage/tests/util/futures/test_iter_completed.py @@ -76,8 +76,6 @@ class IterCompletedTestCase(TestCase): for future_done_set in async_iter_completed(future_generator(), max_jobs=True, max_load=True, loop=loop): - while not input_futures: - loop.run_until_complete(asyncio.sleep(0, loop=loop)) future_done_set.cancel() break diff --git a/lib/portage/util/_async/AsyncScheduler.py b/lib/portage/util/_async/AsyncScheduler.py index b9070061a..c6b523eaa 100644 --- a/lib/portage/util/_async/AsyncScheduler.py +++ b/lib/portage/util/_async/AsyncScheduler.py @@ -1,11 +1,7 @@ # Copyright 2012-2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 -import functools - from portage import os -from portage.util.futures import asyncio -from portage.util.futures.compat_coroutine import coroutine from _emerge.AsynchronousTask import AsynchronousTask from _emerge.PollScheduler import PollScheduler @@ -66,8 +62,8 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): else: self._running_tasks.add(task) task.scheduler = self._sched_iface - future = asyncio.ensure_future(self._task_coroutine(task), loop=self._sched_iface) - future.add_done_callback(functools.partial(self._task_coroutine_done, task)) + task.addExitListener(self._task_exit) + task.start() if self._loadavg_check_id is not None: self._loadavg_check_id.cancel() @@ -77,18 +73,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): # Triggers cleanup and exit listeners if there's nothing left to do. self.poll() - @coroutine - def _task_coroutine(self, task): - yield task.async_start() - yield task.async_wait() - - def _task_coroutine_done(self, task, future): - try: - future.result() - except asyncio.CancelledError: - self.cancel() - self._task_exit(task) - def _task_exit(self, task): self._running_tasks.discard(task) if task.returncode != os.EX_OK: diff --git a/lib/portage/util/futures/iter_completed.py b/lib/portage/util/futures/iter_completed.py index 1fb30eb70..9554b4338 100644 --- a/lib/portage/util/futures/iter_completed.py +++ b/lib/portage/util/futures/iter_completed.py @@ -6,7 +6,6 @@ import functools from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.TaskScheduler import TaskScheduler from portage.util.futures import asyncio -from portage.util.futures.compat_coroutine import coroutine, coroutine_return from portage.util.cpuinfo import get_cpu_count @@ -91,42 +90,21 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): if future_done_set.cancelled() and not wait_result.done(): wait_result.cancel() - @coroutine - def fetch_wait_result(scheduler, first, loop=None): - if first: - yield scheduler.async_start() - - # If the current coroutine awakens just after a call to - # done_callback but before scheduler has been notified of - # corresponding done future(s), then wait here until scheduler - # is notified (which will cause future_map to populate). - while not future_map and scheduler.poll() is None: - yield asyncio.sleep(0, loop=loop) - - if not future_map: - if scheduler.poll() is not None: - coroutine_return((set(), set())) - else: - raise AssertionError('expected non-empty future_map') - - wait_result = yield asyncio.wait(list(future_map.values()), - return_when=asyncio.FIRST_COMPLETED, loop=loop) - - coroutine_return(wait_result) - - first = True try: - while True: - wait_result = asyncio.ensure_future(fetch_wait_result(scheduler, first, loop=loop), loop=loop) - first = False + scheduler.start() + + # scheduler should ensure that future_map is non-empty until + # task_generator is exhausted + while future_map: + wait_result = asyncio.ensure_future( + asyncio.wait(list(future_map.values()), + return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop) future_done_set = loop.create_future() future_done_set.add_done_callback( functools.partial(cancel_callback, wait_result)) wait_result.add_done_callback( functools.partial(done_callback, future_done_set)) yield future_done_set - if not future_map and scheduler.poll() is not None: - break finally: # cleanup in case of interruption by SIGINT, etc scheduler.cancel()