commit:     bf1505777847aface5b4cac16b955071c2915ddd
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Apr  8 05:03:34 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Apr  8 05:29:48 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=bf150577

Revert "AsyncScheduler: use async_start method"

This reverts commit 8f47d3fe1190d4476ae9eebfafcebdfb1794fc05.

Bug: https://bugs.gentoo.org/716636
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/ebuild/test_doebuild_fd_pipes.py |  8 +++--
 .../tests/util/futures/test_iter_completed.py      |  2 --
 lib/portage/util/_async/AsyncScheduler.py          | 20 ++----------
 lib/portage/util/futures/iter_completed.py         | 38 +++++-----------------
 4 files changed, 15 insertions(+), 53 deletions(-)

diff --git a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py 
b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
index 50fc5fe1c..05ea24c4b 100644
--- a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
+++ b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
@@ -109,16 +109,18 @@ class DoebuildFdPipesTestCase(TestCase):
                                                        output_fd: pw,
                                                },
                                                "prev_mtimes": {}})
-                               producer.addStartListener(lambda producer: 
os.close(pw))
 
-                               # PipeReader closes pr
                                consumer = PipeReader(
                                        input_files={"producer" : pr})
 
                                task_scheduler = TaskScheduler(iter([producer, 
consumer]),
                                        max_jobs=2)
 
-                               
loop.run_until_complete(task_scheduler.async_start())
+                               try:
+                                       
loop.run_until_complete(task_scheduler.async_start())
+                               finally:
+                                       # PipeReader closes pr
+                                       os.close(pw)
 
                                task_scheduler.wait()
                                output = portage._unicode_decode(

diff --git a/lib/portage/tests/util/futures/test_iter_completed.py 
b/lib/portage/tests/util/futures/test_iter_completed.py
index 03ace915a..aa24f5685 100644
--- a/lib/portage/tests/util/futures/test_iter_completed.py
+++ b/lib/portage/tests/util/futures/test_iter_completed.py
@@ -76,8 +76,6 @@ class IterCompletedTestCase(TestCase):
 
                for future_done_set in async_iter_completed(future_generator(),
                        max_jobs=True, max_load=True, loop=loop):
-                       while not input_futures:
-                               loop.run_until_complete(asyncio.sleep(0, 
loop=loop))
                        future_done_set.cancel()
                        break
 

diff --git a/lib/portage/util/_async/AsyncScheduler.py 
b/lib/portage/util/_async/AsyncScheduler.py
index b9070061a..c6b523eaa 100644
--- a/lib/portage/util/_async/AsyncScheduler.py
+++ b/lib/portage/util/_async/AsyncScheduler.py
@@ -1,11 +1,7 @@
 # Copyright 2012-2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
-import functools
-
 from portage import os
-from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine
 from _emerge.AsynchronousTask import AsynchronousTask
 from _emerge.PollScheduler import PollScheduler
 
@@ -66,8 +62,8 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
                        else:
                                self._running_tasks.add(task)
                                task.scheduler = self._sched_iface
-                               future = 
asyncio.ensure_future(self._task_coroutine(task), loop=self._sched_iface)
-                               
future.add_done_callback(functools.partial(self._task_coroutine_done, task))
+                               task.addExitListener(self._task_exit)
+                               task.start()
 
                if self._loadavg_check_id is not None:
                        self._loadavg_check_id.cancel()
@@ -77,18 +73,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
                # Triggers cleanup and exit listeners if there's nothing left 
to do.
                self.poll()
 
-       @coroutine
-       def _task_coroutine(self, task):
-               yield task.async_start()
-               yield task.async_wait()
-
-       def _task_coroutine_done(self, task, future):
-               try:
-                       future.result()
-               except asyncio.CancelledError:
-                       self.cancel()
-               self._task_exit(task)
-
        def _task_exit(self, task):
                self._running_tasks.discard(task)
                if task.returncode != os.EX_OK:

diff --git a/lib/portage/util/futures/iter_completed.py 
b/lib/portage/util/futures/iter_completed.py
index 1fb30eb70..9554b4338 100644
--- a/lib/portage/util/futures/iter_completed.py
+++ b/lib/portage/util/futures/iter_completed.py
@@ -6,7 +6,6 @@ import functools
 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
 from portage.util._async.TaskScheduler import TaskScheduler
 from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from portage.util.cpuinfo import get_cpu_count
 
 
@@ -91,42 +90,21 @@ def async_iter_completed(futures, max_jobs=None, 
max_load=None, loop=None):
                if future_done_set.cancelled() and not wait_result.done():
                        wait_result.cancel()
 
-       @coroutine
-       def fetch_wait_result(scheduler, first, loop=None):
-               if first:
-                       yield scheduler.async_start()
-
-               # If the current coroutine awakens just after a call to
-               # done_callback but before scheduler has been notified of
-               # corresponding done future(s), then wait here until scheduler
-               # is notified (which will cause future_map to populate).
-               while not future_map and scheduler.poll() is None:
-                       yield asyncio.sleep(0, loop=loop)
-
-               if not future_map:
-                       if scheduler.poll() is not None:
-                               coroutine_return((set(), set()))
-                       else:
-                               raise AssertionError('expected non-empty 
future_map')
-
-               wait_result = yield asyncio.wait(list(future_map.values()),
-                       return_when=asyncio.FIRST_COMPLETED, loop=loop)
-
-               coroutine_return(wait_result)
-
-       first = True
        try:
-               while True:
-                       wait_result = 
asyncio.ensure_future(fetch_wait_result(scheduler, first, loop=loop), loop=loop)
-                       first = False
+               scheduler.start()
+
+               # scheduler should ensure that future_map is non-empty until
+               # task_generator is exhausted
+               while future_map:
+                       wait_result = asyncio.ensure_future(
+                               asyncio.wait(list(future_map.values()),
+                               return_when=asyncio.FIRST_COMPLETED, 
loop=loop), loop=loop)
                        future_done_set = loop.create_future()
                        future_done_set.add_done_callback(
                                functools.partial(cancel_callback, wait_result))
                        wait_result.add_done_callback(
                                functools.partial(done_callback, 
future_done_set))
                        yield future_done_set
-                       if not future_map and scheduler.poll() is not None:
-                               break
        finally:
                # cleanup in case of interruption by SIGINT, etc
                scheduler.cancel()

Reply via email to