commit: 3e77f0199cb401acf974089fb6aa378fd45d0e90 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Tue Apr 24 06:54:05 2018 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Fri Apr 27 22:56:02 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=3e77f019
ManifestScheduler: async fetchlist_dict (bug 653946) In order to avoid event loop recursion, pass fetchlist_dict to ManifestTask as a Future. Bug: https://bugs.gentoo.org/653946 pym/portage/dbapi/porttree.py | 70 ++++++++++++++++++++++ .../ebuild/_parallel_manifest/ManifestScheduler.py | 25 ++++---- .../ebuild/_parallel_manifest/ManifestTask.py | 24 +++++++- pym/portage/tests/dbapi/test_portdb_cache.py | 3 +- 4 files changed, 105 insertions(+), 17 deletions(-) diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py index 975f03d5e..3ce214cd7 100644 --- a/pym/portage/dbapi/porttree.py +++ b/pym/portage/dbapi/porttree.py @@ -37,6 +37,7 @@ from portage import _unicode_encode from portage import OrderedDict from portage.util._eventloop.EventLoop import EventLoop from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util.futures.iter_completed import iter_gather from _emerge.EbuildMetadataPhase import EbuildMetadataPhase import os as _os @@ -1393,6 +1394,75 @@ class FetchlistDict(Mapping): if sys.hexversion >= 0x3000000: keys = __iter__ + +def _async_manifest_fetchlist(portdb, repo_config, cp, cpv_list=None, + max_jobs=None, max_load=None, loop=None): + """ + Asynchronous form of FetchlistDict, with max_jobs and max_load + parameters in order to control async_aux_get concurrency. + + @param portdb: portdbapi instance + @type portdb: portdbapi + @param repo_config: repository configuration for a Manifest + @type repo_config: RepoConfig + @param cp: cp for a Manifest + @type cp: str + @param cpv_list: list of ebuild cpv values for a Manifest + @type cpv_list: list + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @param loop: event loop + @type loop: EventLoop + @return: a Future resulting in a Mapping compatible with FetchlistDict + @rtype: asyncio.Future (or compatible) + """ + loop = loop or global_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + result = loop.create_future() + cpv_list = (portdb.cp_list(cp, mytree=repo_config.location) + if cpv_list is None else cpv_list) + + def gather_done(gather_result): + # All exceptions must be consumed from gather_result before this + # function returns, in order to avoid triggering the event loop's + # exception handler. + e = None + if not gather_result.cancelled(): + for future in gather_result.result(): + if (future.done() and not future.cancelled() and + future.exception() is not None): + e = future.exception() + + if result.cancelled(): + return + elif e is None: + result.set_result(dict((k, list(v.result())) + for k, v in zip(cpv_list, gather_result.result()))) + else: + result.set_exception(e) + + gather_result = iter_gather( + # Use a generator expression for lazy evaluation, so that iter_gather + # controls the number of concurrent async_fetch_map calls. + (portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop) + for cpv in cpv_list), + max_jobs=max_jobs, + max_load=max_load, + loop=loop, + ) + + gather_result.add_done_callback(gather_done) + result.add_done_callback(lambda result: + gather_result.cancel() if result.cancelled() else None) + + return result + + def _parse_uri_map(cpv, metadata, use=None): myuris = use_reduce(metadata.get('SRC_URI', ''), diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py index 38ac4825e..fabea9bc1 100644 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py @@ -1,10 +1,10 @@ -# Copyright 2012-2013 Gentoo Foundation +# Copyright 2012-2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import portage from portage import os +from portage.dbapi.porttree import _async_manifest_fetchlist from portage.dep import _repo_separator -from portage.exception import InvalidDependString from portage.localization import _ from portage.util._async.AsyncScheduler import AsyncScheduler from .ManifestTask import ManifestTask @@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler): cpv_list = portdb.cp_list(cp, mytree=[repo_config.location]) if not cpv_list: continue - fetchlist_dict = {} - try: - for cpv in cpv_list: - fetchlist_dict[cpv] = \ - list(portdb.getFetchMap(cpv, mytree=mytree)) - except InvalidDependString as e: - portage.writemsg( - _("!!! %s%s%s: SRC_URI: %s\n") % - (cp, _repo_separator, repo_config.name, e), - noiselevel=-1) - self._error_count += 1 - continue + # Use _async_manifest_fetchlist(max_jobs=1), since we + # spawn concurrent ManifestTask instances. yield ManifestTask(cp=cp, distdir=distdir, - fetchlist_dict=fetchlist_dict, repo_config=repo_config, + fetchlist_dict=_async_manifest_fetchlist( + portdb, repo_config, cp, cpv_list=cpv_list, + max_jobs=1, loop=self._event_loop), + repo_config=repo_config, gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars, force_sign_key=self._force_sign_key) @@ -91,3 +84,5 @@ class ManifestScheduler(AsyncScheduler): noiselevel=-1) AsyncScheduler._task_exit(self, task) + + diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py index 0ee2b910d..6bf5e82ef 100644 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py @@ -1,4 +1,4 @@ -# Copyright 2012-2013 Gentoo Foundation +# Copyright 2012-2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import errno @@ -8,8 +8,12 @@ import subprocess from portage import os from portage import _unicode_encode, _encodings from portage.const import MANIFEST2_IDENTIFIERS +from portage.dep import _repo_separator +from portage.exception import InvalidDependString +from portage.localization import _ from portage.util import (atomic_ofstream, grablines, shlex_split, varexpand, writemsg) +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from _emerge.CompositeTask import CompositeTask @@ -29,6 +33,24 @@ class ManifestTask(CompositeTask): def _start(self): self._manifest_path = os.path.join(self.repo_config.location, self.cp, "Manifest") + + self._start_task( + AsyncTaskFuture(future=self.fetchlist_dict), + self._start_with_fetchlist) + + def _start_with_fetchlist(self, fetchlist_task): + if self._default_exit(fetchlist_task) != os.EX_OK: + if not self.fetchlist_dict.cancelled(): + try: + self.fetchlist_dict.result() + except InvalidDependString as e: + writemsg( + _("!!! %s%s%s: SRC_URI: %s\n") % + (self.cp, _repo_separator, self.repo_config.name, e), + noiselevel=-1) + self._async_wait() + return + self.fetchlist_dict = self.fetchlist_dict.result() manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir, fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config, scheduler=self.scheduler) diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py index bd934460a..d3101b120 100644 --- a/pym/portage/tests/dbapi/test_portdb_cache.py +++ b/pym/portage/tests/dbapi/test_portdb_cache.py @@ -1,4 +1,4 @@ -# Copyright 2012-2015 Gentoo Foundation +# Copyright 2012-2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import subprocess @@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase): portage_python = portage._python_interpreter egencache_cmd = (portage_python, "-b", "-Wd", os.path.join(self.bindir, "egencache"), + "--update-manifests", "--sign-manifests=n", "--repo", "test_repo", "--repositories-configuration", settings.repositories.config_string()) python_cmd = (portage_python, "-b", "-Wd", "-c")