commit: d800d224ab38c0f524d3fe858ebe201cbfa903c1 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Thu Nov 6 08:33:03 2014 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Sun Dec 7 23:10:48 2014 +0000 URL: http://sources.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=d800d224
Log changes between vdb_metadata.pickle updates This adds add support to generate a vdb_metadata_delta.json file which tracks package merges / unmerges that occur between updates to vdb_metadata.pickle. IndexedVardb can use the delta together with vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg, so that it can avoid expensive listdir calls in /var/db/pkg/*. Note that vdb_metadata.pickle is only updated periodically, in order to avoid excessive re-writes of a large file. In order to test the performance gains from this patch, you need to generate /var/cache/edb/vdb_metadata_delta.json first, which will happen automatically if you run 'emerge -p anything' with root privileges. --- pym/portage/dbapi/IndexedVardb.py | 22 ++++- pym/portage/dbapi/_VdbMetadataDelta.py | 153 +++++++++++++++++++++++++++++++++ pym/portage/dbapi/vartree.py | 42 ++++++--- 3 files changed, 206 insertions(+), 11 deletions(-) diff --git a/pym/portage/dbapi/IndexedVardb.py b/pym/portage/dbapi/IndexedVardb.py index 424defc..38bfeed 100644 --- a/pym/portage/dbapi/IndexedVardb.py +++ b/pym/portage/dbapi/IndexedVardb.py @@ -3,6 +3,7 @@ import portage from portage.dep import Atom +from portage.exception import InvalidData from portage.versions import _pkg_str class IndexedVardb(object): @@ -42,7 +43,26 @@ class IndexedVardb(object): if self._cp_map is not None: return iter(sorted(self._cp_map)) - return self._iter_cp_all() + delta_data = self._vardb._cache_delta.loadRace() + if delta_data is None: + return self._iter_cp_all() + + self._vardb._cache_delta.applyDelta(delta_data) + + self._cp_map = cp_map = {} + for cpv in self._vardb._aux_cache["packages"]: + try: + cpv = _pkg_str(cpv) + except InvalidData: + continue + + cp_list = cp_map.get(cpv.cp) + if cp_list is None: + cp_list = [] + cp_map[cpv.cp] = cp_list + cp_list.append(cpv) + + return iter(sorted(self._cp_map)) def _iter_cp_all(self): self._cp_map = cp_map = {} diff --git a/pym/portage/dbapi/_VdbMetadataDelta.py b/pym/portage/dbapi/_VdbMetadataDelta.py new file mode 100644 index 0000000..3e3ff18 --- /dev/null +++ b/pym/portage/dbapi/_VdbMetadataDelta.py @@ -0,0 +1,153 @@ +# Copyright 2014 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import errno +import io +import json +import os + +from portage import _encodings +from portage.util import atomic_ofstream + +class VdbMetadataDelta(object): + + _format_version = "1" + + def __init__(self, vardb): + self._vardb = vardb + + def initialize(self, timestamp): + f = atomic_ofstream(self._vardb._cache_delta_filename, 'w', + encoding=_encodings['repo.content'], errors='strict') + json.dump({ + "version": self._format_version, + "timestamp": timestamp + }, f, ensure_ascii=False) + f.close() + + def load(self): + + if not os.path.exists(self._vardb._aux_cache_filename): + # If the primary cache doesn't exist yet, then + # we can't record a delta against it. + return None + + try: + with io.open(self._vardb._cache_delta_filename, 'r', + encoding=_encodings['repo.content'], + errors='strict') as f: + cache_obj = json.load(f) + except EnvironmentError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + raise + except (SystemExit, KeyboardInterrupt): + raise + except Exception: + # Corrupt, or not json format. + pass + else: + try: + version = cache_obj["version"] + except KeyError: + pass + else: + # Verify that the format version is compatible, + # since a newer version of portage may have + # written an incompatible file. + if version == self._format_version: + try: + deltas = cache_obj["deltas"] + except KeyError: + cache_obj["deltas"] = deltas = [] + + if isinstance(deltas, list): + return cache_obj + + return None + + def loadRace(self): + """ + This calls self.load() and validates the timestamp + against the currently loaded self._vardb._aux_cache. If a + concurrent update causes the timestamps to be inconsistent, + then it reloads the caches and tries one more time before + it aborts. In practice, the race is very unlikely, so + this will usually succeed on the first try. + """ + + tries = 2 + while tries: + tries -= 1 + cache_delta = self.load() + if cache_delta is not None and \ + cache_delta.get("timestamp") != \ + self._vardb._aux_cache.get("timestamp", False): + self._vardb._aux_cache_obj = None + else: + return cache_delta + + return None + + def recordEvent(self, event, cpv, slot, counter): + + self._vardb.lock() + try: + deltas_obj = self.load() + + if deltas_obj is None: + # We can't record meaningful deltas without + # a pre-existing state. + return + + delta_node = { + "event": event, + "package": cpv.cp, + "version": cpv.version, + "slot": slot, + "counter": "%s" % counter + } + + deltas_obj["deltas"].append(delta_node) + + # Eliminate earlier nodes cancelled out by later nodes + # that have identical package and slot attributes. + filtered_list = [] + slot_keys = set() + version_keys = set() + for delta_node in reversed(deltas_obj["deltas"]): + slot_key = (delta_node["package"], + delta_node["slot"]) + version_key = (delta_node["package"], + delta_node["version"]) + if not (slot_key in slot_keys or \ + version_key in version_keys): + filtered_list.append(delta_node) + slot_keys.add(slot_key) + version_keys.add(version_key) + + filtered_list.reverse() + deltas_obj["deltas"] = filtered_list + + f = atomic_ofstream(self._vardb._cache_delta_filename, + mode='w', encoding=_encodings['repo.content']) + json.dump(deltas_obj, f, ensure_ascii=False) + f.close() + + finally: + self._vardb.unlock() + + def applyDelta(self, data): + packages = self._vardb._aux_cache["packages"] + for delta in data["deltas"]: + cpv = delta["package"] + "-" + delta["version"] + event = delta["event"] + if event == "add": + # Use aux_get to populate the cache + # for this cpv. + if cpv not in packages: + try: + self._vardb.aux_get(cpv, ["DESCRIPTION"]) + except KeyError: + pass + elif event == "remove": + packages.pop(cpv, None) diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py index 9c8b276..2d4d32d 100644 --- a/pym/portage/dbapi/vartree.py +++ b/pym/portage/dbapi/vartree.py @@ -64,6 +64,7 @@ from portage import _os_merge from portage import _selinux_merge from portage import _unicode_decode from portage import _unicode_encode +from ._VdbMetadataDelta import VdbMetadataDelta from _emerge.EbuildBuildDir import EbuildBuildDir from _emerge.EbuildPhase import EbuildPhase @@ -179,6 +180,9 @@ class vardbapi(dbapi): self._aux_cache_obj = None self._aux_cache_filename = os.path.join(self._eroot, CACHE_PATH, "vdb_metadata.pickle") + self._cache_delta_filename = os.path.join(self._eroot, + CACHE_PATH, "vdb_metadata_delta.json") + self._cache_delta = VdbMetadataDelta(self) self._counter_path = os.path.join(self._eroot, CACHE_PATH, "counter") @@ -569,22 +573,31 @@ class vardbapi(dbapi): long as at least part of the cache is still valid).""" if self._flush_cache_enabled and \ self._aux_cache is not None and \ - len(self._aux_cache["modified"]) >= self._aux_cache_threshold and \ - secpass >= 2: + secpass >= 2 and \ + (len(self._aux_cache["modified"]) >= self._aux_cache_threshold or + not os.path.exists(self._cache_delta_filename)): + + ensure_dirs(os.path.dirname(self._aux_cache_filename)) + self._owners.populate() # index any unindexed contents valid_nodes = set(self.cpv_all()) for cpv in list(self._aux_cache["packages"]): if cpv not in valid_nodes: del self._aux_cache["packages"][cpv] del self._aux_cache["modified"] - try: - f = atomic_ofstream(self._aux_cache_filename, 'wb') - pickle.dump(self._aux_cache, f, protocol=2) - f.close() - apply_secpass_permissions( - self._aux_cache_filename, gid=portage_gid, mode=0o644) - except (IOError, OSError) as e: - pass + timestamp = time.time() + self._aux_cache["timestamp"] = timestamp + + f = atomic_ofstream(self._aux_cache_filename, 'wb') + pickle.dump(self._aux_cache, f, protocol=2) + f.close() + apply_secpass_permissions( + self._aux_cache_filename, mode=0o644) + + self._cache_delta.initialize(timestamp) + apply_secpass_permissions( + self._cache_delta_filename, mode=0o644) + self._aux_cache["modified"] = set() @property @@ -1622,6 +1635,13 @@ class dblink(object): self.dbdir, noiselevel=-1) return + if self.dbdir is self.dbpkgdir: + counter, = self.vartree.dbapi.aux_get( + self.mycpv, ["COUNTER"]) + self.vartree.dbapi._cache_delta.recordEvent( + "remove", self.mycpv, + self.settings["SLOT"].split("/")[0], counter) + shutil.rmtree(self.dbdir) # If empty, remove parent category directory. try: @@ -4232,6 +4252,8 @@ class dblink(object): self.delete() _movefile(self.dbtmpdir, self.dbpkgdir, mysettings=self.settings) self._merged_path(self.dbpkgdir, os.lstat(self.dbpkgdir)) + self.vartree.dbapi._cache_delta.recordEvent( + "add", self.mycpv, slot, counter) finally: self.unlockdb()